Skip to content

Commit

Permalink
[bug]: Fix multi partition wrong column requirement bug (apache#7129)
Browse files Browse the repository at this point in the history
* bug fix, aggregate multi partition wrong index

* Add check for whether requirement expression is already used in group by

* Minor changes

* minor changes

* Minor changes

* Minor changes

* Minor hcnages

* minor changes

* Minor changes

* Minor changes

* Minor changes

* Update test

* simplifications

* Update merge_batch of first and last

* add new test

* Simplifications

* Remove unnecessary code

* Minor changes

* Minor changes

* Simplifications

* Minor changes

* Simplifications

* Add comment

* Remove artifact during merge

* move is_first_stage to method

* Improve comments, use more idiomatic constructs

---------

Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
  • Loading branch information
mustafasrepo and ozankabak authored Aug 1, 2023
1 parent ddb9549 commit e39b5ca
Show file tree
Hide file tree
Showing 6 changed files with 526 additions and 164 deletions.
275 changes: 229 additions & 46 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::physical_plan::{
DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
SendableRecordBatchStream, Statistics,
};

use arrow::array::ArrayRef;
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
Expand All @@ -40,6 +41,7 @@ use datafusion_physical_expr::{
AggregateExpr, LexOrdering, LexOrderingReq, OrderingEquivalenceProperties,
PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
};

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -84,6 +86,20 @@ pub enum AggregateMode {
SinglePartitioned,
}

impl AggregateMode {
/// Checks whether this aggregation step describes a "first stage" calculation.
/// In other words, its input is not another aggregation result and the
/// `merge_batch` method will not be called for these modes.
fn is_first_stage(&self) -> bool {
match self {
AggregateMode::Partial
| AggregateMode::Single
| AggregateMode::SinglePartitioned => true,
AggregateMode::Final | AggregateMode::FinalPartitioned => false,
}
}
}

/// Group By expression modes
///
/// `PartiallyOrdered` and `FullyOrdered` are used to reason about
Expand All @@ -95,9 +111,6 @@ pub enum AggregateMode {
/// previous combinations are guaranteed never to appear again
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GroupByOrderMode {
/// The input is not (known to be) ordered by any of the
/// expressions in the GROUP BY clause.
None,
/// The input is known to be ordered by a preset (prefix but
/// possibly reordered) of the expressions in the `GROUP BY` clause.
///
Expand Down Expand Up @@ -475,13 +488,13 @@ fn calc_required_input_ordering(
};
for (is_reverse, aggregator_requirement) in aggregator_requirements.into_iter() {
if let Some(AggregationOrdering {
ordering,
// If the mode is FullyOrdered or PartiallyOrdered (i.e. we are
// running with bounded memory, without breaking the pipeline),
// then we append the aggregator ordering requirement to the existing
// ordering. This way, we can still run with bounded memory.
mode: GroupByOrderMode::FullyOrdered | GroupByOrderMode::PartiallyOrdered,
order_indices,
..
}) = aggregation_ordering
{
// Get the section of the input ordering that enables us to run in
Expand All @@ -495,32 +508,17 @@ fn calc_required_input_ordering(
let mut requirement =
PhysicalSortRequirement::from_sort_exprs(requirement_prefix.iter());
for req in aggregator_requirement {
if requirement.iter().all(|item| req.expr.ne(&item.expr)) {
requirement.push(req.clone());
}
// In partial mode, append required ordering of the aggregator to the output ordering.
// In case of multiple partitions, this enables us to reduce partitions correctly.
if matches!(mode, AggregateMode::Partial)
&& ordering.iter().all(|item| req.expr.ne(&item.expr))
// Final and FinalPartitioned modes don't enforce ordering
// requirements since order-sensitive aggregators handle such
// requirements during merging.
if mode.is_first_stage()
&& requirement.iter().all(|item| req.expr.ne(&item.expr))
{
ordering.push(req.into());
requirement.push(req);
}
}
required_input_ordering = requirement;
} else {
// If there was no pre-existing output ordering, the output ordering is simply the required
// ordering of the aggregator in partial mode.
if matches!(mode, AggregateMode::Partial)
&& !aggregator_requirement.is_empty()
{
*aggregation_ordering = Some(AggregationOrdering {
mode: GroupByOrderMode::None,
order_indices: vec![],
ordering: PhysicalSortRequirement::to_sort_exprs(
aggregator_requirement.clone(),
),
});
}
} else if mode.is_first_stage() {
required_input_ordering = aggregator_requirement;
}
// Keep track of the direction from which required_input_ordering is constructed:
Expand Down Expand Up @@ -596,12 +594,16 @@ impl AggregateExec {
.iter()
.zip(order_by_expr.into_iter())
.map(|(aggr_expr, fn_reqs)| {
// If aggregation function is ordering sensitive, keep ordering requirement as is; otherwise ignore requirement
if is_order_sensitive(aggr_expr) {
fn_reqs
} else {
None
}
// If the aggregation function is order-sensitive and we are
// performing a "first stage" calculation, keep the ordering
// requirement as is; otherwise ignore the ordering requirement.
// In non-first stage modes, we accumulate data (using `merge_batch`)
// from different partitions (i.e. merge partial results). During
// this merge, we consider the ordering of each partial result.
// Hence, we do not need to use the ordering requirement in such
// modes as long as partial results are generated with the
// correct ordering.
fn_reqs.filter(|_| is_order_sensitive(aggr_expr) && mode.is_first_stage())
})
.collect::<Vec<_>>();
let mut aggregator_reverse_reqs = None;
Expand Down Expand Up @@ -645,7 +647,6 @@ impl AggregateExec {
}

let mut aggregation_ordering = calc_aggregation_ordering(&input, &group_by);

let required_input_ordering = calc_required_input_ordering(
&input,
&mut aggr_expr,
Expand Down Expand Up @@ -1216,42 +1217,45 @@ fn evaluate_group_by(
mod tests {
use super::*;
use crate::execution::context::SessionConfig;
use crate::physical_plan::aggregates::GroupByOrderMode::{
FullyOrdered, PartiallyOrdered,
};
use crate::physical_plan::aggregates::{
get_finest_requirement, get_working_mode, AggregateExec, AggregateMode,
PhysicalGroupBy,
};
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::expressions::{col, Avg};
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::{
DisplayAs, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use crate::prelude::SessionContext;
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use crate::test::{assert_is_pending, csv_exec_sorted};
use crate::{assert_batches_sorted_eq, physical_plan::common};
use crate::{assert_batches_eq, assert_batches_sorted_eq, physical_plan::common};

use arrow::array::{Float64Array, UInt32Array};
use arrow::compute::{concat_batches, SortOptions};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_physical_expr::expressions::{
lit, ApproxDistinct, Column, Count, FirstValue, Median,
lit, ApproxDistinct, Column, Count, FirstValue, LastValue, Median,
};
use datafusion_physical_expr::{
AggregateExpr, EquivalenceProperties, OrderingEquivalenceProperties,
PhysicalExpr, PhysicalSortExpr,
};
use futures::{FutureExt, Stream};

use std::any::Any;
use std::sync::Arc;
use std::task::{Context, Poll};

use super::StreamType;
use crate::physical_plan::aggregates::GroupByOrderMode::{
FullyOrdered, PartiallyOrdered,
};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::{
DisplayAs, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use crate::prelude::SessionContext;
use futures::{FutureExt, Stream};

// Generate a schema which consists of 5 columns (a, b, c, d, e)
fn create_test_schema() -> Result<SchemaRef> {
Expand Down Expand Up @@ -1370,6 +1374,57 @@ mod tests {
)
}

/// Generates some mock data for aggregate tests.
fn some_data_v2() -> (Arc<Schema>, Vec<RecordBatch>) {
// Define a schema:
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::UInt32, false),
Field::new("b", DataType::Float64, false),
]));

// Generate data so that first and last value results are at 2nd and
// 3rd partitions. With this construction, we guarantee we don't receive
// the expected result by accident, but merging actually works properly;
// i.e. it doesn't depend on the data insertion order.
(
schema.clone(),
vec![
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(UInt32Array::from(vec![2, 3, 4, 4])),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
],
)
.unwrap(),
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(UInt32Array::from(vec![2, 3, 3, 4])),
Arc::new(Float64Array::from(vec![0.0, 1.0, 2.0, 3.0])),
],
)
.unwrap(),
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(UInt32Array::from(vec![2, 3, 3, 4])),
Arc::new(Float64Array::from(vec![3.0, 4.0, 5.0, 6.0])),
],
)
.unwrap(),
RecordBatch::try_new(
schema,
vec![
Arc::new(UInt32Array::from(vec![2, 3, 3, 4])),
Arc::new(Float64Array::from(vec![2.0, 3.0, 4.0, 5.0])),
],
)
.unwrap(),
],
)
}

async fn check_grouping_sets(input: Arc<dyn ExecutionPlan>) -> Result<()> {
let input_schema = input.schema();

Expand Down Expand Up @@ -1885,6 +1940,134 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn run_first_last_multi_partitions() -> Result<()> {
for use_coalesce_batches in [false, true] {
for is_first_acc in [false, true] {
first_last_multi_partitions(use_coalesce_batches, is_first_acc).await?
}
}
Ok(())
}

// This function either constructs the physical plan below,
//
// "AggregateExec: mode=Final, gby=[a@0 as a], aggr=[FIRST_VALUE(b)]",
// " CoalesceBatchesExec: target_batch_size=1024",
// " CoalescePartitionsExec",
// " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[FIRST_VALUE(b)], ordering_mode=None",
// " MemoryExec: partitions=4, partition_sizes=[1, 1, 1, 1]",
//
// or
//
// "AggregateExec: mode=Final, gby=[a@0 as a], aggr=[FIRST_VALUE(b)]",
// " CoalescePartitionsExec",
// " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[FIRST_VALUE(b)], ordering_mode=None",
// " MemoryExec: partitions=4, partition_sizes=[1, 1, 1, 1]",
//
// and checks whether the function `merge_batch` works correctly for
// FIRST_VALUE and LAST_VALUE functions.
async fn first_last_multi_partitions(
use_coalesce_batches: bool,
is_first_acc: bool,
) -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();

let (schema, data) = some_data_v2();
let partition1 = data[0].clone();
let partition2 = data[1].clone();
let partition3 = data[2].clone();
let partition4 = data[3].clone();

let groups =
PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]);

let ordering_req = vec![PhysicalSortExpr {
expr: col("b", &schema)?,
options: SortOptions::default(),
}];
let aggregates: Vec<Arc<dyn AggregateExpr>> = if is_first_acc {
vec![Arc::new(FirstValue::new(
col("b", &schema)?,
"FIRST_VALUE(b)".to_string(),
DataType::Float64,
ordering_req.clone(),
vec![DataType::Float64],
))]
} else {
vec![Arc::new(LastValue::new(
col("b", &schema)?,
"LAST_VALUE(b)".to_string(),
DataType::Float64,
ordering_req.clone(),
vec![DataType::Float64],
))]
};

let memory_exec = Arc::new(MemoryExec::try_new(
&[
vec![partition1],
vec![partition2],
vec![partition3],
vec![partition4],
],
schema.clone(),
None,
)?);
let aggregate_exec = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates.clone(),
vec![None],
vec![Some(ordering_req.clone())],
memory_exec,
schema.clone(),
)?);
let coalesce = if use_coalesce_batches {
let coalesce = Arc::new(CoalescePartitionsExec::new(aggregate_exec));
Arc::new(CoalesceBatchesExec::new(coalesce, 1024)) as Arc<dyn ExecutionPlan>
} else {
Arc::new(CoalescePartitionsExec::new(aggregate_exec))
as Arc<dyn ExecutionPlan>
};
let aggregate_final = Arc::new(AggregateExec::try_new(
AggregateMode::Final,
groups,
aggregates.clone(),
vec![None],
vec![Some(ordering_req)],
coalesce,
schema,
)?) as Arc<dyn ExecutionPlan>;

let result = crate::physical_plan::collect(aggregate_final, task_ctx).await?;
if is_first_acc {
let expected = vec![
"+---+----------------+",
"| a | FIRST_VALUE(b) |",
"+---+----------------+",
"| 2 | 0.0 |",
"| 3 | 1.0 |",
"| 4 | 3.0 |",
"+---+----------------+",
];
assert_batches_eq!(expected, &result);
} else {
let expected = vec![
"+---+---------------+",
"| a | LAST_VALUE(b) |",
"+---+---------------+",
"| 2 | 3.0 |",
"| 3 | 5.0 |",
"| 4 | 6.0 |",
"+---+---------------+",
];
assert_batches_eq!(expected, &result);
};
Ok(())
}

#[tokio::test]
async fn test_get_finest_requirements() -> Result<()> {
let test_schema = create_test_schema()?;
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/physical_plan/aggregates/order/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ impl GroupOrdering {
} = ordering;

Ok(match mode {
GroupByOrderMode::None => GroupOrdering::None,
GroupByOrderMode::PartiallyOrdered => {
let partial =
GroupOrderingPartial::try_new(input_schema, order_indices, ordering)?;
Expand Down
Loading

0 comments on commit e39b5ca

Please sign in to comment.