diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 8360847e1bd1..797562e92d2e 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -20,16 +20,8 @@ use std::str::FromStr; use std::{any::Any, sync::Arc}; -use arrow::compute::SortOptions; -use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; -use arrow_schema::Schema; -use async_trait::async_trait; -use datafusion_common::FileTypeWriterOptions; -use datafusion_common::{internal_err, plan_err, project_schema, SchemaExt, ToDFSchema}; -use datafusion_expr::expr::Sort; -use datafusion_optimizer::utils::conjunction; -use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr}; -use futures::{future, stream, StreamExt, TryStreamExt}; +use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; +use super::PartitionedFile; use crate::datasource::file_format::file_compression_type::{ FileCompressionType, FileTypeExt, @@ -54,13 +46,21 @@ use crate::{ logical_expr::Expr, physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}, }; -use datafusion_common::FileType; +use arrow::compute::SortOptions; +use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; +use arrow_schema::Schema; +use datafusion_common::{ + internal_err, plan_err, project_schema, FileType, FileTypeWriterOptions, SchemaExt, + ToDFSchema, +}; use datafusion_execution::cache::cache_manager::FileStatisticsCache; use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; +use datafusion_expr::expr::Sort; +use datafusion_optimizer::utils::conjunction; +use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr}; -use super::PartitionedFile; - -use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; +use async_trait::async_trait; +use futures::{future, stream, StreamExt, TryStreamExt}; /// Configuration for creating a [`ListingTable`] #[derive(Debug, Clone)] @@ -996,6 +996,9 @@ impl ListingTable { #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::fs::File; + use super::*; use crate::datasource::{provider_as_source, MemTable}; use crate::execution::options::ArrowReadOptions; @@ -1010,14 +1013,13 @@ mod tests { logical_expr::{col, lit}, test::{columns, object_store::register_test_store}, }; + use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; - use datafusion_common::assert_contains; - use datafusion_common::GetExt; - use datafusion_expr::LogicalPlanBuilder; + use datafusion_common::{assert_contains, GetExt, ScalarValue}; + use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; + use rstest::*; - use std::collections::HashMap; - use std::fs::File; use tempfile::TempDir; /// It creates dummy file and checks if it can create unbounded input executors. @@ -2048,6 +2050,7 @@ mod tests { } None => SessionContext::new(), }; + let target_partition_number = session_ctx.state().config().target_partitions(); // Create a new schema with one field called "a" of type Int32 let schema = Arc::new(Schema::new(vec![Field::new( @@ -2056,6 +2059,12 @@ mod tests { false, )])); + let filter_predicate = Expr::BinaryExpr(BinaryExpr::new( + Box::new(Expr::Column("column1".into())), + Operator::GtEq, + Box::new(Expr::Literal(ScalarValue::Int32(Some(0)))), + )); + // Create a new batch of data to insert into the table let batch = RecordBatch::try_new( schema.clone(), @@ -2136,8 +2145,10 @@ mod tests { let source = provider_as_source(source_table); // Create a table scan logical plan to read from the source table let scan_plan = LogicalPlanBuilder::scan("source", source, None)? - .repartition(Partitioning::Hash(vec![Expr::Column("column1".into())], 6))? + .filter(filter_predicate)? .build()?; + // Since logical plan contains a filter, increasing parallelism is helpful. + // Therefore, we will have 8 partitions in the final plan. // Create an insert plan to insert the source data into the initial table let insert_into_table = LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?; @@ -2146,7 +2157,6 @@ mod tests { .state() .create_physical_plan(&insert_into_table) .await?; - // Execute the physical plan and collect the results let res = collect(plan, session_ctx.task_ctx()).await?; // Insert returns the number of rows written, in our case this would be 6. @@ -2178,9 +2188,9 @@ mod tests { // Assert that the batches read from the file match the expected result. assert_batches_eq!(expected, &batches); - // Assert that 6 files were added to the table + // Assert that `target_partition_number` many files were added to the table. let num_files = tmp_dir.path().read_dir()?.count(); - assert_eq!(num_files, 6); + assert_eq!(num_files, target_partition_number); // Create a physical plan from the insert plan let plan = session_ctx @@ -2221,9 +2231,9 @@ mod tests { // Assert that the batches read from the file after the second append match the expected result. assert_batches_eq!(expected, &batches); - // Assert that another 6 files were added to the table + // Assert that another `target_partition_number` many files were added to the table. let num_files = tmp_dir.path().read_dir()?.count(); - assert_eq!(num_files, 12); + assert_eq!(num_files, 2 * target_partition_number); // Return Ok if the function Ok(()) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index b2a1a0338384..b3fb41ea100f 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -27,8 +27,11 @@ use std::sync::Arc; use crate::config::ConfigOptions; use crate::datasource::physical_plan::{CsvExec, ParquetExec}; -use crate::error::{DataFusionError, Result}; -use crate::physical_optimizer::utils::{add_sort_above, get_plan_string, ExecTree}; +use crate::error::Result; +use crate::physical_optimizer::utils::{ + add_sort_above, get_children_exectrees, get_plan_string, is_coalesce_partitions, + is_repartition, is_sort_preserving_merge, ExecTree, +}; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -44,7 +47,6 @@ use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::Partitioning; use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; -use datafusion_common::internal_err; use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_expr::logical_plan::JoinType; use datafusion_physical_expr::equivalence::EquivalenceProperties; @@ -57,6 +59,7 @@ use datafusion_physical_expr::{ }; use datafusion_physical_plan::unbounded_output; +use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec}; use itertools::izip; /// The `EnforceDistribution` rule ensures that distribution requirements are @@ -213,9 +216,7 @@ impl PhysicalOptimizerRule for EnforceDistribution { distribution_context.transform_up(&|distribution_context| { ensure_distribution(distribution_context, config) })?; - - // If output ordering is not necessary, removes it - update_plan_to_remove_unnecessary_final_order(distribution_context) + Ok(distribution_context.plan) } fn name(&self) -> &str { @@ -979,12 +980,6 @@ fn add_roundrobin_on_top( RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(n_target))? .with_preserve_order(should_preserve_ordering), ) as Arc; - if let Some(exec_tree) = dist_onward { - return internal_err!( - "ExecTree should have been empty, but got:{:?}", - exec_tree - ); - } // update distribution onward with new operator update_distribution_onward(new_plan.clone(), dist_onward, input_idx); @@ -1112,8 +1107,9 @@ fn add_spm_on_top( } } -/// Updates the physical plan inside `distribution_context` if having a -/// `RepartitionExec(RoundRobin)` is not helpful. +/// Updates the physical plan inside `distribution_context` so that distribution +/// changing operators are removed from the top. If they are necessary, they will +/// be added in subsequent stages. /// /// Assume that following plan is given: /// ```text @@ -1122,14 +1118,13 @@ fn add_spm_on_top( /// " ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC]", /// ``` /// -/// `RepartitionExec` at the top is unnecessary. Since it doesn't help with increasing parallelism. -/// This function removes top repartition, and returns following plan. +/// Since `RepartitionExec`s change the distribution, this function removes +/// them and returns following plan: /// /// ```text -/// "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", -/// " ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC]", +/// "ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC]", /// ``` -fn remove_unnecessary_repartition( +fn remove_dist_changing_operators( distribution_context: DistributionContext, ) -> Result { let DistributionContext { @@ -1137,22 +1132,17 @@ fn remove_unnecessary_repartition( mut distribution_onwards, } = distribution_context; - // Remove any redundant RoundRobin at the start: - if let Some(repartition) = plan.as_any().downcast_ref::() { - if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() { - // Repartition is useless: - if *n_out <= repartition.input().output_partitioning().partition_count() { - let mut new_distribution_onwards = - vec![None; repartition.input().children().len()]; - if let Some(exec_tree) = &distribution_onwards[0] { - for child in &exec_tree.children { - new_distribution_onwards[child.idx] = Some(child.clone()); - } - } - plan = repartition.input().clone(); - distribution_onwards = new_distribution_onwards; - } - } + // Remove any distribution changing operators at the beginning: + // Note that they will be re-inserted later on if necessary or helpful. + while is_repartition(&plan) + || is_coalesce_partitions(&plan) + || is_sort_preserving_merge(&plan) + { + // All of above operators have a single child. When we remove the top + // operator, we take the first child. + plan = plan.children()[0].clone(); + distribution_onwards = + get_children_exectrees(plan.children().len(), &distribution_onwards[0]); } // Create a plan with the updated children: @@ -1162,29 +1152,6 @@ fn remove_unnecessary_repartition( }) } -/// Changes each child of the `dist_context.plan` such that they no longer -/// use order preserving variants, if no ordering is required at the output -/// of the physical plan (there is no global ordering requirement by the query). -fn update_plan_to_remove_unnecessary_final_order( - dist_context: DistributionContext, -) -> Result> { - let DistributionContext { - plan, - distribution_onwards, - } = dist_context; - let new_children = izip!(plan.children(), distribution_onwards) - .map(|(mut child, mut dist_onward)| { - replace_order_preserving_variants(&mut child, &mut dist_onward)?; - Ok(child) - }) - .collect::>>()?; - if !new_children.is_empty() { - plan.with_new_children(new_children) - } else { - Ok(plan) - } -} - /// Updates the physical plan `input` by using `dist_onward` replace order preserving operator variants /// with their corresponding operators that do not preserve order. It is a wrapper for `replace_order_preserving_variants_helper` fn replace_order_preserving_variants( @@ -1224,18 +1191,16 @@ fn replace_order_preserving_variants_helper( for child in &exec_tree.children { updated_children[child.idx] = replace_order_preserving_variants_helper(child)?; } - if let Some(spm) = exec_tree - .plan - .as_any() - .downcast_ref::() - { - return Ok(Arc::new(CoalescePartitionsExec::new(spm.input().clone()))); + if is_sort_preserving_merge(&exec_tree.plan) { + return Ok(Arc::new(CoalescePartitionsExec::new( + updated_children[0].clone(), + ))); } if let Some(repartition) = exec_tree.plan.as_any().downcast_ref::() { if repartition.preserve_order() { return Ok(Arc::new( RepartitionExec::try_new( - repartition.input().clone(), + updated_children[0].clone(), repartition.partitioning().clone(), )? .with_preserve_order(false), @@ -1275,12 +1240,29 @@ fn ensure_distribution( repartition_beneficial_stat = stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true); } - // Remove unnecessary repartition from the physical plan if any let DistributionContext { - plan, + mut plan, mut distribution_onwards, - } = remove_unnecessary_repartition(dist_context)?; + } = remove_dist_changing_operators(dist_context)?; + + if let Some(exec) = plan.as_any().downcast_ref::() { + if let Some(updated_window) = get_best_fitting_window( + exec.window_expr(), + exec.input(), + &exec.partition_keys, + )? { + plan = updated_window; + } + } else if let Some(exec) = plan.as_any().downcast_ref::() { + if let Some(updated_window) = get_best_fitting_window( + exec.window_expr(), + exec.input(), + &exec.partition_keys, + )? { + plan = updated_window; + } + }; let n_children = plan.children().len(); // This loop iterates over all the children to: @@ -1369,19 +1351,23 @@ fn ensure_distribution( // Either: // - Ordering requirement cannot be satisfied by preserving ordering through repartitions, or // - using order preserving variant is not desirable. - if !ordering_satisfy_requirement_concrete( + let ordering_satisfied = ordering_satisfy_requirement_concrete( existing_ordering, required_input_ordering, || child.equivalence_properties(), || child.ordering_equivalence_properties(), - ) || !order_preserving_variants_desirable - { + ); + if !ordering_satisfied || !order_preserving_variants_desirable { replace_order_preserving_variants(&mut child, dist_onward)?; - let sort_expr = PhysicalSortRequirement::to_sort_exprs( - required_input_ordering.clone(), - ); - // Make sure to satisfy ordering requirement - add_sort_above(&mut child, sort_expr, None)?; + // If ordering requirements were satisfied before repartitioning, + // make sure ordering requirements are still satisfied after. + if ordering_satisfied { + // Make sure to satisfy ordering requirement: + let sort_expr = PhysicalSortRequirement::to_sort_exprs( + required_input_ordering.clone(), + ); + add_sort_above(&mut child, sort_expr, None)?; + } } // Stop tracking distribution changing operators *dist_onward = None; @@ -1690,6 +1676,7 @@ mod tests { use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; use crate::physical_optimizer::enforce_sorting::EnforceSorting; + use crate::physical_optimizer::output_requirements::OutputRequirements; use crate::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; @@ -1703,9 +1690,12 @@ mod tests { use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics}; - use crate::physical_optimizer::test_utils::repartition_exec; + use crate::physical_optimizer::test_utils::{ + coalesce_partitions_exec, repartition_exec, + }; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::sorts::sort::SortExec; + use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::ScalarValue; @@ -1714,7 +1704,7 @@ mod tests { use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; use datafusion_physical_expr::{ expressions, expressions::binary, expressions::lit, expressions::Column, - PhysicalExpr, PhysicalSortExpr, + LexOrdering, PhysicalExpr, PhysicalSortExpr, }; /// Models operators like BoundedWindowExec that require an input @@ -1722,11 +1712,23 @@ mod tests { #[derive(Debug)] struct SortRequiredExec { input: Arc, + expr: LexOrdering, } impl SortRequiredExec { fn new(input: Arc) -> Self { - Self { input } + let expr = input.output_ordering().unwrap_or(&[]).to_vec(); + Self { input, expr } + } + + fn new_with_requirement( + input: Arc, + requirement: Vec, + ) -> Self { + Self { + input, + expr: requirement, + } } } @@ -1736,7 +1738,8 @@ mod tests { _t: DisplayFormatType, f: &mut std::fmt::Formatter, ) -> std::fmt::Result { - write!(f, "SortRequiredExec") + let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); + write!(f, "SortRequiredExec: [{}]", expr.join(",")) } } @@ -1778,7 +1781,10 @@ mod tests { ) -> Result> { assert_eq!(children.len(), 1); let child = children.pop().unwrap(); - Ok(Arc::new(Self::new(child))) + Ok(Arc::new(Self::new_with_requirement( + child, + self.expr.clone(), + ))) } fn execute( @@ -2054,6 +2060,13 @@ mod tests { Arc::new(SortRequiredExec::new(input)) } + fn sort_required_exec_with_req( + input: Arc, + sort_exprs: LexOrdering, + ) -> Arc { + Arc::new(SortRequiredExec::new_with_requirement(input, sort_exprs)) + } + fn trim_plan_display(plan: &str) -> Vec<&str> { plan.split('\n') .map(|s| s.trim()) @@ -2118,10 +2131,15 @@ mod tests { // `EnforceSorting` and `EnforceDistribution`. // TODO: Orthogonalize the tests here just to verify `EnforceDistribution` and create // new tests for the cascade. + + // Add the ancillary output requirements operator at the start: + let optimizer = OutputRequirements::new_add_mode(); + let optimized = optimizer.optimize($PLAN.clone(), &config)?; + let optimized = if $FIRST_ENFORCE_DIST { // Run enforce distribution rule first: let optimizer = EnforceDistribution::new(); - let optimized = optimizer.optimize($PLAN.clone(), &config)?; + let optimized = optimizer.optimize(optimized, &config)?; // The rule should be idempotent. // Re-running this rule shouldn't introduce unnecessary operators. let optimizer = EnforceDistribution::new(); @@ -2133,7 +2151,7 @@ mod tests { } else { // Run the enforce sorting rule first: let optimizer = EnforceSorting::new(); - let optimized = optimizer.optimize($PLAN.clone(), &config)?; + let optimized = optimizer.optimize(optimized, &config)?; // Run enforce distribution rule: let optimizer = EnforceDistribution::new(); let optimized = optimizer.optimize(optimized, &config)?; @@ -2144,6 +2162,10 @@ mod tests { optimized }; + // Remove the ancillary output requirements operator when done: + let optimizer = OutputRequirements::new_remove_mode(); + let optimized = optimizer.optimize(optimized, &config)?; + // Now format correctly let plan = displayable(optimized.as_ref()).indent(true).to_string(); let actual_lines = trim_plan_display(&plan); @@ -2978,7 +3000,7 @@ mod tests { format!("SortMergeJoin: join_type={join_type}, on=[(a@0, c@2)]"); let expected = match join_type { - // Should include 6 RepartitionExecs 3 SortExecs + // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 SortExecs JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![ top_join_plan.as_str(), @@ -2997,9 +3019,18 @@ mod tests { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], - // Should include 7 RepartitionExecs + // Should include 7 RepartitionExecs (4 hash, 3 round-robin), 4 SortExecs + // Since ordering of the left child is not preserved after SortMergeJoin + // when mode is Right, RgihtSemi, RightAnti, Full + // - We need to add one additional SortExec after SortMergeJoin in contrast the test cases + // when mode is Inner, Left, LeftSemi, LeftAnti + // Similarly, since partitioning of the left side is not preserved + // when mode is Right, RgihtSemi, RightAnti, Full + // - We need to add one additional Hash Repartition after SortMergeJoin in contrast the test + // cases when mode is Inner, Left, LeftSemi, LeftAnti _ => vec![ top_join_plan.as_str(), + // Below 2 operators are differences introduced, when join mode is changed "SortExec: expr=[a@0 ASC]", "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", join_plan.as_str(), @@ -3021,7 +3052,7 @@ mod tests { assert_optimized!(expected, top_join.clone(), true, true); let expected_first_sort_enforcement = match join_type { - // Should include 3 RepartitionExecs 3 SortExecs + // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 SortExecs JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![ top_join_plan.as_str(), @@ -3040,9 +3071,18 @@ mod tests { "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], - // Should include 8 RepartitionExecs (4 of them preserves ordering) + // Should include 8 RepartitionExecs (4 hash, 8 round-robin), 4 SortExecs + // Since ordering of the left child is not preserved after SortMergeJoin + // when mode is Right, RgihtSemi, RightAnti, Full + // - We need to add one additional SortExec after SortMergeJoin in contrast the test cases + // when mode is Inner, Left, LeftSemi, LeftAnti + // Similarly, since partitioning of the left side is not preserved + // when mode is Right, RgihtSemi, RightAnti, Full + // - We need to add one additional Hash Repartition and Roundrobin repartition after + // SortMergeJoin in contrast the test cases when mode is Inner, Left, LeftSemi, LeftAnti _ => vec![ top_join_plan.as_str(), + // Below 4 operators are differences introduced, when join mode is changed "SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[a@0 ASC]", @@ -3083,7 +3123,7 @@ mod tests { format!("SortMergeJoin: join_type={join_type}, on=[(b1@6, c@2)]"); let expected = match join_type { - // Should include 3 RepartitionExecs and 3 SortExecs + // Should include 6 RepartitionExecs(3 hash, 3 round-robin) and 3 SortExecs JoinType::Inner | JoinType::Right => vec![ top_join_plan.as_str(), join_plan.as_str(), @@ -3101,8 +3141,8 @@ mod tests { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], - // Should include 4 RepartitionExecs and 4 SortExecs - _ => vec![ + // Should include 7 RepartitionExecs (4 hash, 3 round-robin) and 4 SortExecs + JoinType::Left | JoinType::Full => vec![ top_join_plan.as_str(), "SortExec: expr=[b1@6 ASC]", "RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", @@ -3121,6 +3161,8 @@ mod tests { "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], + // this match arm cannot be reached + _ => unreachable!() }; assert_optimized!(expected, top_join.clone(), true, true); @@ -3144,7 +3186,7 @@ mod tests { "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 8 RepartitionExecs (4 of them preserves order) and 4 SortExecs - _ => vec![ + JoinType::Left | JoinType::Full => vec![ top_join_plan.as_str(), "SortPreservingRepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", @@ -3165,6 +3207,8 @@ mod tests { "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], + // this match arm cannot be reached + _ => unreachable!() }; assert_optimized!( expected_first_sort_enforcement, @@ -3301,6 +3345,16 @@ mod tests { "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", ]; assert_optimized!(expected, exec, true); + // In this case preserving ordering through order preserving operators is not desirable + // (according to flag: bounded_order_preserving_variants) + // hence in this case ordering lost during CoalescePartitionsExec and re-introduced with + // SortExec at the top. + let expected = &[ + "SortExec: expr=[a@0 ASC]", + "CoalescePartitionsExec", + "CoalesceBatchesExec: target_batch_size=4096", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", + ]; assert_optimized!(expected, exec, false); Ok(()) } @@ -3435,7 +3489,7 @@ mod tests { sort_required_exec(filter_exec(sort_exec(sort_key, parquet_exec(), false))); let expected = &[ - "SortRequiredExec", + "SortRequiredExec: [c@2 ASC]", "FilterExec: c@2 = 0", // We can use repartition here, ordering requirement by SortRequiredExec // is still satisfied. @@ -3541,6 +3595,12 @@ mod tests { ]; assert_optimized!(expected, plan.clone(), true); + + let expected = &[ + "SortExec: expr=[c@2 ASC]", + "CoalescePartitionsExec", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", + ]; assert_optimized!(expected, plan, false); Ok(()) } @@ -3565,6 +3625,14 @@ mod tests { ]; assert_optimized!(expected, plan.clone(), true); + + let expected = &[ + "SortExec: expr=[c@2 ASC]", + "CoalescePartitionsExec", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", + ]; assert_optimized!(expected, plan, false); Ok(()) } @@ -3583,7 +3651,7 @@ mod tests { // during repartitioning ordering is preserved let expected = &[ - "SortRequiredExec", + "SortRequiredExec: [c@2 ASC]", "FilterExec: c@2 = 0", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", @@ -3619,7 +3687,7 @@ mod tests { let expected = &[ "UnionExec", // union input 1: no repartitioning - "SortRequiredExec", + "SortRequiredExec: [c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", // union input 2: should repartition "FilterExec: c@2 = 0", @@ -3687,16 +3755,13 @@ mod tests { ("c".to_string(), "c".to_string()), ]; // sorted input - let plan = sort_preserving_merge_exec( - sort_key.clone(), - projection_exec_with_alias( - parquet_exec_multiple_sorted(vec![sort_key]), - alias, - ), - ); + let plan = sort_required_exec(projection_exec_with_alias( + parquet_exec_multiple_sorted(vec![sort_key]), + alias, + )); let expected = &[ - "SortPreservingMergeExec: [c@2 ASC]", + "SortRequiredExec: [c@2 ASC]", // Since this projection is trivial, increasing parallelism is not beneficial "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", @@ -4186,11 +4251,11 @@ mod tests { // no parallelization, because SortRequiredExec doesn't benefit from increased parallelism let expected_parquet = &[ - "SortRequiredExec", + "SortRequiredExec: [c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", ]; let expected_csv = &[ - "SortRequiredExec", + "SortRequiredExec: [c@2 ASC]", "CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false", ]; @@ -4348,6 +4413,14 @@ mod tests { ]; assert_optimized!(expected, physical_plan.clone(), true); + + let expected = &[ + "SortExec: expr=[c@2 ASC]", + "CoalescePartitionsExec", + "FilterExec: c@2 = 0", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", + ]; assert_optimized!(expected, physical_plan, false); Ok(()) @@ -4377,6 +4450,15 @@ mod tests { ]; assert_optimized!(expected, physical_plan.clone(), true); + + let expected = &[ + "SortExec: expr=[a@0 ASC]", + "CoalescePartitionsExec", + "SortExec: expr=[a@0 ASC]", + "FilterExec: c@2 = 0", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", + ]; assert_optimized!(expected, physical_plan, false); Ok(()) @@ -4404,6 +4486,86 @@ mod tests { Ok(()) } + #[test] + fn do_not_put_sort_when_input_is_invalid() -> Result<()> { + let schema = schema(); + let sort_key = vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]; + let input = parquet_exec(); + let physical_plan = sort_required_exec_with_req(filter_exec(input), sort_key); + let expected = &[ + // Ordering requirement of sort required exec is NOT satisfied + // by existing ordering at the source. + "SortRequiredExec: [a@0 ASC]", + "FilterExec: c@2 = 0", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_plan_txt!(expected, physical_plan); + + let expected = &[ + "SortRequiredExec: [a@0 ASC]", + // Since at the start of the rule ordering requirement is not satisfied + // EnforceDistribution rule doesn't satisfy this requirement either. + "FilterExec: c@2 = 0", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + + let mut config = ConfigOptions::new(); + config.execution.target_partitions = 10; + config.optimizer.enable_round_robin_repartition = true; + config.optimizer.bounded_order_preserving_variants = false; + let distribution_plan = + EnforceDistribution::new().optimize(physical_plan, &config)?; + assert_plan_txt!(expected, distribution_plan); + + Ok(()) + } + + #[test] + fn put_sort_when_input_is_valid() -> Result<()> { + let schema = schema(); + let sort_key = vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]; + let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]); + let physical_plan = sort_required_exec_with_req(filter_exec(input), sort_key); + + let expected = &[ + // Ordering requirement of sort required exec is satisfied + // by existing ordering at the source. + "SortRequiredExec: [a@0 ASC]", + "FilterExec: c@2 = 0", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", + ]; + assert_plan_txt!(expected, physical_plan); + + let expected = &[ + "SortRequiredExec: [a@0 ASC]", + // Since at the start of the rule ordering requirement is satisfied + // EnforceDistribution rule satisfy this requirement also. + // ordering is re-satisfied by introduction of SortExec. + "SortExec: expr=[a@0 ASC]", + "FilterExec: c@2 = 0", + // ordering is lost here + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", + ]; + + let mut config = ConfigOptions::new(); + config.execution.target_partitions = 10; + config.optimizer.enable_round_robin_repartition = true; + config.optimizer.bounded_order_preserving_variants = false; + let distribution_plan = + EnforceDistribution::new().optimize(physical_plan, &config)?; + assert_plan_txt!(expected, distribution_plan); + + Ok(()) + } + #[test] fn do_not_add_unnecessary_hash() -> Result<()> { let schema = schema(); @@ -4458,4 +4620,51 @@ mod tests { Ok(()) } + + #[test] + fn optimize_away_unnecessary_repartition() -> Result<()> { + let physical_plan = coalesce_partitions_exec(repartition_exec(parquet_exec())); + let expected = &[ + "CoalescePartitionsExec", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + plans_matches_expected!(expected, physical_plan.clone()); + + let expected = + &["ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]"]; + + assert_optimized!(expected, physical_plan.clone(), true); + assert_optimized!(expected, physical_plan, false); + + Ok(()) + } + + #[test] + fn optimize_away_unnecessary_repartition2() -> Result<()> { + let physical_plan = filter_exec(repartition_exec(coalesce_partitions_exec( + filter_exec(repartition_exec(parquet_exec())), + ))); + let expected = &[ + "FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " CoalescePartitionsExec", + " FilterExec: c@2 = 0", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + plans_matches_expected!(expected, physical_plan.clone()); + + let expected = &[ + "FilterExec: c@2 = 0", + "FilterExec: c@2 = 0", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + + assert_optimized!(expected, physical_plan.clone(), true); + assert_optimized!(expected, physical_plan, false); + + Ok(()) + } } diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index a149330181d9..c4b72a7cb31e 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -2035,7 +2035,6 @@ mod tests { let orig_plan = Arc::new(SortExec::new(sort_exprs, repartition)) as Arc; let actual = get_plan_string(&orig_plan); - println!("{:?}", actual); let expected_input = vec![ "SortExec: expr=[nullable_col@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 0801a9bc595c..9e22bff340c9 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -28,6 +28,7 @@ pub mod enforce_distribution; pub mod enforce_sorting; pub mod join_selection; pub mod optimizer; +pub mod output_requirements; pub mod pipeline_checker; pub mod pruning; pub mod replace_with_order_preserving_variants; diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs index 5de70efe3c47..95035e5f81a0 100644 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ b/datafusion/core/src/physical_optimizer/optimizer.rs @@ -26,6 +26,7 @@ use crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAgg use crate::physical_optimizer::enforce_distribution::EnforceDistribution; use crate::physical_optimizer::enforce_sorting::EnforceSorting; use crate::physical_optimizer::join_selection::JoinSelection; +use crate::physical_optimizer::output_requirements::OutputRequirements; use crate::physical_optimizer::pipeline_checker::PipelineChecker; use crate::physical_optimizer::topk_aggregation::TopKAggregation; use crate::{error::Result, physical_plan::ExecutionPlan}; @@ -68,6 +69,9 @@ impl PhysicalOptimizer { /// Create a new optimizer using the recommended list of rules pub fn new() -> Self { let rules: Vec> = vec![ + // If there is a output requirement of the query, make sure that + // this information is not lost across different rules during optimization. + Arc::new(OutputRequirements::new_add_mode()), Arc::new(AggregateStatistics::new()), // Statistics-based join selection will change the Auto mode to a real join implementation, // like collect left, or hash join, or future sort merge join, which will influence the @@ -90,6 +94,9 @@ impl PhysicalOptimizer { // The CoalesceBatches rule will not influence the distribution and ordering of the // whole plan tree. Therefore, to avoid influencing other rules, it should run last. Arc::new(CoalesceBatches::new()), + // Remove the ancillary output requirement operator since we are done with the planning + // phase. + Arc::new(OutputRequirements::new_remove_mode()), // The PipelineChecker rule will reject non-runnable query plans that use // pipeline-breaking operators on infinite input(s). The rule generates a // diagnostic error message when this happens. It makes no changes to the diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs b/datafusion/core/src/physical_optimizer/output_requirements.rs new file mode 100644 index 000000000000..4b687d7f3536 --- /dev/null +++ b/datafusion/core/src/physical_optimizer/output_requirements.rs @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The GlobalOrderRequire optimizer rule either: +//! - Adds an auxiliary `OutputRequirementExec` operator to keep track of global +//! ordering and distribution requirement across rules, or +//! - Removes the auxiliary `OutputRequirementExec` operator from the physical plan. +//! Since the `OutputRequirementExec` operator is only a helper operator, it +//! shouldn't occur in the final plan (i.e. the executed plan). + +use std::sync::Arc; + +use crate::physical_optimizer::PhysicalOptimizerRule; +use crate::physical_plan::sorts::sort::SortExec; +use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; + +use arrow_schema::SchemaRef; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::{Result, Statistics}; +use datafusion_physical_expr::{ + Distribution, LexOrderingReq, PhysicalSortExpr, PhysicalSortRequirement, +}; +use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; + +/// This rule either adds or removes [`OutputRequirements`]s to/from the physical +/// plan according to its `mode` attribute, which is set by the constructors +/// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of +/// the global requirements (ordering and distribution) across rules. +/// +/// The primary usecase of this node and rule is to specify and preserve the desired output +/// ordering and distribution the entire plan. When sending to a single client, a single partition may +/// be desirable, but when sending to a multi-partitioned writer, keeping multiple partitions may be +/// better. +#[derive(Debug)] +pub struct OutputRequirements { + mode: RuleMode, +} + +impl OutputRequirements { + /// Create a new rule which works in `Add` mode; i.e. it simply adds a + /// top-level [`OutputRequirementExec`] into the physical plan to keep track + /// of global ordering and distribution requirements if there are any. + /// Note that this rule should run at the beginning. + pub fn new_add_mode() -> Self { + Self { + mode: RuleMode::Add, + } + } + + /// Create a new rule which works in `Remove` mode; i.e. it simply removes + /// the top-level [`OutputRequirementExec`] from the physical plan if there is + /// any. We do this because a `OutputRequirementExec` is an ancillary, + /// non-executable operator whose sole purpose is to track global + /// requirements during optimization. Therefore, a + /// `OutputRequirementExec` should not appear in the final plan. + pub fn new_remove_mode() -> Self { + Self { + mode: RuleMode::Remove, + } + } +} + +#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Hash)] +enum RuleMode { + Add, + Remove, +} + +/// An ancillary, non-executable operator whose sole purpose is to track global +/// requirements during optimization. It imposes +/// - the ordering requirement in its `order_requirement` attribute. +/// - the distribution requirement in its `dist_requirement` attribute. +/// +/// See [`OutputRequirements`] for more details +#[derive(Debug)] +struct OutputRequirementExec { + input: Arc, + order_requirement: Option, + dist_requirement: Distribution, +} + +impl OutputRequirementExec { + fn new( + input: Arc, + requirements: Option, + dist_requirement: Distribution, + ) -> Self { + Self { + input, + order_requirement: requirements, + dist_requirement, + } + } + + fn input(&self) -> Arc { + self.input.clone() + } +} + +impl DisplayAs for OutputRequirementExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "OutputRequirementExec") + } +} + +impl ExecutionPlan for OutputRequirementExec { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn output_partitioning(&self) -> crate::physical_plan::Partitioning { + self.input.output_partitioning() + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + + fn required_input_distribution(&self) -> Vec { + vec![self.dist_requirement.clone()] + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.input.output_ordering() + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + fn required_input_ordering(&self) -> Vec>> { + vec![self.order_requirement.clone()] + } + + fn unbounded_output(&self, children: &[bool]) -> Result { + // Has a single child + Ok(children[0]) + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + Ok(Arc::new(Self::new( + children.remove(0), // has a single child + self.order_requirement.clone(), + self.dist_requirement.clone(), + ))) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unreachable!(); + } + + fn statistics(&self) -> Statistics { + self.input.statistics() + } +} + +impl PhysicalOptimizerRule for OutputRequirements { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + match self.mode { + RuleMode::Add => require_top_ordering(plan), + RuleMode::Remove => plan.transform_up(&|plan| { + if let Some(sort_req) = + plan.as_any().downcast_ref::() + { + Ok(Transformed::Yes(sort_req.input())) + } else { + Ok(Transformed::No(plan)) + } + }), + } + } + + fn name(&self) -> &str { + "OutputRequirements" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// This functions adds ancillary `OutputRequirementExec` to the the physical plan, so that +/// global requirements are not lost during optimization. +fn require_top_ordering(plan: Arc) -> Result> { + let (new_plan, is_changed) = require_top_ordering_helper(plan)?; + if is_changed { + Ok(new_plan) + } else { + // Add `OutputRequirementExec` to the top, with no specified ordering and distribution requirement. + Ok(Arc::new(OutputRequirementExec::new( + new_plan, + // there is no ordering requirement + None, + Distribution::UnspecifiedDistribution, + )) as _) + } +} + +/// Helper function that adds an ancillary `OutputRequirementExec` to the given plan. +/// First entry in the tuple is resulting plan, second entry indicates whether any +/// `OutputRequirementExec` is added to the plan. +fn require_top_ordering_helper( + plan: Arc, +) -> Result<(Arc, bool)> { + let mut children = plan.children(); + // Global ordering defines desired ordering in the final result. + if children.len() != 1 { + Ok((plan, false)) + } else if let Some(sort_exec) = plan.as_any().downcast_ref::() { + let req_ordering = sort_exec.output_ordering().unwrap_or(&[]); + let req_dist = sort_exec.required_input_distribution()[0].clone(); + let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering); + Ok(( + Arc::new(OutputRequirementExec::new(plan, Some(reqs), req_dist)) as _, + true, + )) + } else if let Some(spm) = plan.as_any().downcast_ref::() { + let reqs = PhysicalSortRequirement::from_sort_exprs(spm.expr()); + Ok(( + Arc::new(OutputRequirementExec::new( + plan, + Some(reqs), + Distribution::SinglePartition, + )) as _, + true, + )) + } else if plan.maintains_input_order()[0] + && plan.required_input_ordering()[0].is_none() + { + // Keep searching for a `SortExec` as long as ordering is maintained, + // and on-the-way operators do not themselves require an ordering. + // When an operator requires an ordering, any `SortExec` below can not + // be responsible for (i.e. the originator of) the global ordering. + let (new_child, is_changed) = + require_top_ordering_helper(children.swap_remove(0))?; + Ok((plan.with_new_children(vec![new_child])?, is_changed)) + } else { + // Stop searching, there is no global ordering desired for the query. + Ok((plan, false)) + } +} diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index 21c976e07a15..0d6c85f9f22b 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -73,6 +73,30 @@ impl ExecTree { } } +/// Get `ExecTree` for each child of the plan if they are tracked. +/// # Arguments +/// +/// * `n_children` - Children count of the plan of interest +/// * `onward` - Contains `Some(ExecTree)` of the plan tracked. +/// - Contains `None` is plan is not tracked. +/// +/// # Returns +/// +/// A `Vec>` that contains tracking information of each child. +/// If a child is `None`, it is not tracked. If `Some(ExecTree)` child is tracked also. +pub(crate) fn get_children_exectrees( + n_children: usize, + onward: &Option, +) -> Vec> { + let mut children_onward = vec![None; n_children]; + if let Some(exec_tree) = &onward { + for child in &exec_tree.children { + children_onward[child.idx] = Some(child.clone()); + } + } + children_onward +} + /// This utility function adds a `SortExec` above an operator according to the /// given ordering requirements while preserving the original partitioning. pub fn add_sort_above( diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index 79214092fa57..73085937cbca 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. +use std::ops::Deref; +use std::sync::Arc; + use arrow::array::{Int32Builder, Int64Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; -use async_trait::async_trait; use datafusion::datasource::provider::{TableProvider, TableType}; use datafusion::error::Result; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; @@ -32,10 +34,10 @@ use datafusion::physical_plan::{ use datafusion::prelude::*; use datafusion::scalar::ScalarValue; use datafusion_common::cast::as_primitive_array; -use datafusion_common::{not_impl_err, DataFusionError}; +use datafusion_common::{internal_err, not_impl_err, DataFusionError}; use datafusion_expr::expr::{BinaryExpr, Cast}; -use std::ops::Deref; -use std::sync::Arc; + +use async_trait::async_trait; fn create_batch(value: i32, num_rows: usize) -> Result { let mut builder = Int32Builder::with_capacity(num_rows); @@ -96,9 +98,14 @@ impl ExecutionPlan for CustomPlan { fn with_new_children( self: Arc, - _: Vec>, + children: Vec>, ) -> Result> { - unreachable!() + // CustomPlan has no children + if children.is_empty() { + Ok(self) + } else { + internal_err!("Children cannot be replaced in {self:?}") + } } fn execute( diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 8eddf57ae551..646d42795ba4 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -104,6 +104,10 @@ impl ExecutionPlan for CoalescePartitionsExec { self.input.equivalence_properties() } + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + fn with_new_children( self: Arc, children: Vec>, diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index b29c8e9c7bd9..1dcdae56cfa3 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -126,9 +126,14 @@ impl ExecutionPlan for MemoryExec { fn with_new_children( self: Arc, - _: Vec>, + children: Vec>, ) -> Result> { - internal_err!("Children cannot be replaced in {self:?}") + // MemoryExec has no children + if children.is_empty() { + Ok(self) + } else { + internal_err!("Children cannot be replaced in {self:?}") + } } fn execute( diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 00809b71e443..cb972fa41e3e 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -163,9 +163,13 @@ impl ExecutionPlan for StreamingTableExec { fn with_new_children( self: Arc, - _children: Vec>, + children: Vec>, ) -> Result> { - internal_err!("Children cannot be replaced in {self:?}") + if children.is_empty() { + Ok(self) + } else { + internal_err!("Children cannot be replaced in {self:?}") + } } fn execute( diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index b1ba1eb36d11..27ab8671e939 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -242,12 +242,16 @@ logical_plan after eliminate_projection SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE logical_plan TableScan: simple_explain_test projection=[a, b, c] initial_physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true +physical_plan after OutputRequirements +OutputRequirementExec +--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true physical_plan after aggregate_statistics SAME TEXT AS ABOVE physical_plan after join_selection SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE +physical_plan after OutputRequirements CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true physical_plan after PipelineChecker SAME TEXT AS ABOVE physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index ffef93837b27..5bb0f31ed542 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -2014,9 +2014,9 @@ Sort: l.col0 ASC NULLS LAST ----------TableScan: tab0 projection=[col0, col1] physical_plan SortPreservingMergeExec: [col0@0 ASC NULLS LAST] ---ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS LAST]@3 as last_col1] -----AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallyOrdered -------SortExec: expr=[col0@0 ASC NULLS LAST] +--SortExec: expr=[col0@0 ASC NULLS LAST] +----ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS LAST]@3 as last_col1] +------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)] --------CoalesceBatchesExec: target_batch_size=8192 ----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4 ------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallyOrdered diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 3d9f7511be26..b6325fd889ec 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3197,6 +3197,72 @@ SELECT a_new, d, rn1 FROM (SELECT d, a as a_new, 0 0 4 0 1 5 +query TT +EXPLAIN SELECT SUM(a) OVER(partition by a, b order by c) as sum1, +SUM(a) OVER(partition by b, a order by c) as sum2, + SUM(a) OVER(partition by a, d order by b) as sum3, + SUM(a) OVER(partition by d order by a) as sum4 +FROM annotated_data_infinite2; +---- +logical_plan +Projection: SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum3, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum4 +--WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +----Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +------------TableScan: annotated_data_infinite2 projection=[a, b, c, d] +physical_plan +ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum3, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum4] +--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Linear] +----ProjectionExec: expr=[a@0 as a, d@3 as d, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] +------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] +--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[PartiallySorted([0])] +----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] +------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true + +statement ok +set datafusion.execution.target_partitions = 2; + +# re-execute the same query in multi partitions. +# final plan should still be streamable +query TT +EXPLAIN SELECT SUM(a) OVER(partition by a, b order by c) as sum1, + SUM(a) OVER(partition by b, a order by c) as sum2, + SUM(a) OVER(partition by a, d order by b) as sum3, + SUM(a) OVER(partition by d order by a) as sum4 +FROM annotated_data_infinite2; +---- +logical_plan +Projection: SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum3, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum4 +--WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +----Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +--------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +----------WindowAggr: windowExpr=[[SUM(CAST(annotated_data_infinite2.a AS Int64)) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +------------TableScan: annotated_data_infinite2 projection=[a, b, c, d] +physical_plan +ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as sum1, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum2, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as sum3, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as sum4] +--BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Linear] +----CoalesceBatchesExec: target_batch_size=4096 +------SortPreservingRepartitionExec: partitioning=Hash([d@1], 2), input_partitions=2 +--------ProjectionExec: expr=[a@0 as a, d@3 as d, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] +----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] +------------CoalesceBatchesExec: target_batch_size=4096 +--------------SortPreservingRepartitionExec: partitioning=Hash([b@1, a@0], 2), input_partitions=2 +----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[PartiallySorted([0])] +------------------CoalesceBatchesExec: target_batch_size=4096 +--------------------SortPreservingRepartitionExec: partitioning=Hash([a@0, d@3], 2), input_partitions=2 +----------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] +------------------------CoalesceBatchesExec: target_batch_size=4096 +--------------------------SortPreservingRepartitionExec: partitioning=Hash([a@0, b@1], 2), input_partitions=2 +----------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true + +# reset the partition number 1 again +statement ok +set datafusion.execution.target_partitions = 1; + statement ok drop table annotated_data_finite2