diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index bf5aa7d02272..ce0e22f35c6b 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1198,32 +1198,33 @@ fn ensure_distribution( ) .map( |(mut child, requirement, required_input_ordering, would_benefit, maintains)| { - // Don't need to apply when the returned row count is not greater than 1: + // Don't need to apply when the returned row count is not greater than batch size let num_rows = child.plan.statistics()?.num_rows; let repartition_beneficial_stats = if num_rows.is_exact().unwrap_or(false) { num_rows .get_value() .map(|value| value > &batch_size) - .unwrap_or(true) + .unwrap() // safe to unwrap since is_exact() is true } else { true }; + // When `repartition_file_scans` is set, attempt to increase + // parallelism at the source. + if repartition_file_scans && repartition_beneficial_stats { + if let Some(new_child) = + child.plan.repartitioned(target_partitions, config)? + { + child.plan = new_child; + } + } + if enable_round_robin // Operator benefits from partitioning (e.g. filter): && (would_benefit && repartition_beneficial_stats) // Unless partitioning doesn't increase the partition count, it is not beneficial: && child.plan.output_partitioning().partition_count() < target_partitions { - // When `repartition_file_scans` is set, attempt to increase - // parallelism at the source. - if repartition_file_scans { - if let Some(new_child) = - child.plan.repartitioned(target_partitions, config)? - { - child.plan = new_child; - } - } // Increase parallelism by adding round-robin repartitioning // on top of the operator. Note that we only do this if the // partition count is not already equal to the desired partition @@ -1362,17 +1363,10 @@ impl DistributionContext { fn update_children(mut self) -> Result { for child_context in self.children_nodes.iter_mut() { - child_context.distribution_connection = match child_context.plan.as_any() { - plan_any if plan_any.is::() => matches!( - plan_any - .downcast_ref::() - .unwrap() - .partitioning(), - Partitioning::RoundRobinBatch(_) | Partitioning::Hash(_, _) - ), - plan_any - if plan_any.is::() - || plan_any.is::() => + child_context.distribution_connection = match &child_context.plan { + plan if is_repartition(plan) + || is_coalesce_partitions(plan) + || is_sort_preserving_merge(plan) => { true } @@ -3871,14 +3865,14 @@ pub(crate) mod tests { "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", // Plan already has two partitions - "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e]", ]; let expected_csv = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", // Plan already has two partitions - "CsvExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], has_header=false", + "CsvExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], has_header=false", ]; assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10); diff --git a/datafusion/sqllogictest/test_files/arrow_typeof.slt b/datafusion/sqllogictest/test_files/arrow_typeof.slt index 3fad4d0f61b9..6a623e6c92f9 100644 --- a/datafusion/sqllogictest/test_files/arrow_typeof.slt +++ b/datafusion/sqllogictest/test_files/arrow_typeof.slt @@ -375,4 +375,4 @@ select arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'); query T select arrow_typeof(arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)')); ---- -LargeList(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) \ No newline at end of file +LargeList(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index 02eccd7c5d06..9d4951c7ecac 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -63,6 +63,26 @@ CoalesceBatchesExec: target_batch_size=8192 --FilterExec: column1@0 != 42 ----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 +# disable round robin repartitioning +statement ok +set datafusion.optimizer.enable_round_robin_repartition = false; + +## Expect to see the scan read the file as "4" groups with even sizes (offsets) again +query TT +EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42; +---- +logical_plan +Filter: parquet_table.column1 != Int32(42) +--TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)] +physical_plan +CoalesceBatchesExec: target_batch_size=8192 +--FilterExec: column1@0 != 42 +----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 + +# enable round robin repartitioning again +statement ok +set datafusion.optimizer.enable_round_robin_repartition = true; + # create a second parquet file statement ok COPY (VALUES (100), (200)) TO 'test_files/scratch/repartition_scan/parquet_table/1.parquet' @@ -147,7 +167,7 @@ WITH HEADER ROW LOCATION 'test_files/scratch/repartition_scan/csv_table/'; query I -select * from csv_table; +select * from csv_table ORDER BY column1; ---- 1 2 @@ -190,7 +210,7 @@ STORED AS json LOCATION 'test_files/scratch/repartition_scan/json_table/'; query I -select * from "json_table"; +select * from "json_table" ORDER BY column1; ---- 1 2 diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index c84e46c965fa..8b0f50cedf05 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -1862,7 +1862,7 @@ SELECT to_timestamp(null) is null as c1, ---- true true true true true true true true true true true true true -# verify timestamp output types +# verify timestamp output types query TTT SELECT arrow_typeof(to_timestamp(1)), arrow_typeof(to_timestamp(null)), arrow_typeof(to_timestamp('2023-01-10 12:34:56.000')) ---- @@ -1880,7 +1880,7 @@ SELECT arrow_typeof(to_timestamp(1)) = arrow_typeof(1::timestamp) as c1, true true true true true true # known issues. currently overflows (expects default precision to be microsecond instead of nanoseconds. Work pending) -#verify extreme values +#verify extreme values #query PPPPPPPP #SELECT to_timestamp(-62125747200), to_timestamp(1926632005177), -62125747200::timestamp, 1926632005177::timestamp, cast(-62125747200 as timestamp), cast(1926632005177 as timestamp) #---- diff --git a/datafusion/sqllogictest/test_files/tpch/q2.slt.part b/datafusion/sqllogictest/test_files/tpch/q2.slt.part index ed439348d22d..ed950db190bb 100644 --- a/datafusion/sqllogictest/test_files/tpch/q2.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/q2.slt.part @@ -238,7 +238,7 @@ order by p_partkey limit 10; ---- -9828.21 Supplier#000000647 UNITED KINGDOM 13120 Manufacturer#5 x5U7MBZmwfG9 33-258-202-4782 s the slyly even ideas poach fluffily +9828.21 Supplier#000000647 UNITED KINGDOM 13120 Manufacturer#5 x5U7MBZmwfG9 33-258-202-4782 s the slyly even ideas poach fluffily 9508.37 Supplier#000000070 FRANCE 3563 Manufacturer#1 INWNH2w,OOWgNDq0BRCcBwOMQc6PdFDc4 16-821-608-1166 ests sleep quickly express ideas. ironic ideas haggle about the final T 9508.37 Supplier#000000070 FRANCE 17268 Manufacturer#4 INWNH2w,OOWgNDq0BRCcBwOMQc6PdFDc4 16-821-608-1166 ests sleep quickly express ideas. ironic ideas haggle about the final T 9453.01 Supplier#000000802 ROMANIA 10021 Manufacturer#5 ,6HYXb4uaHITmtMBj4Ak57Pd 29-342-882-6463 gular frets. permanently special multipliers believe blithely alongs diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 7d6d59201396..100c2143837a 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3794,7 +3794,7 @@ select a, 1 1 2 1 -# support scalar value in ORDER BY +# support scalar value in ORDER BY query I select rank() over (order by 1) rnk from (select 1 a union all select 2 a) x ----