Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug]: Fix multi partition wrong column requirement bug #7129

Merged
merged 28 commits into from
Aug 1, 2023
Merged

[bug]: Fix multi partition wrong column requirement bug #7129

merged 28 commits into from
Aug 1, 2023

Conversation

mustafasrepo
Copy link
Contributor

Which issue does this PR close?

Closes #7128.

Rationale for this change

What changes are included in this PR?

As explained in the issue body. The reason for this bug is that, during Aggregate::Final and Aggregate::FinalPartitioned stages, requirement for the order sensitive aggregators are no longer valid at the input schema (their input is Aggregate::Partial). Hence when these expressions are required by executor, they may refer to invalid or wrong columns.

This PR fixes this bug, by removing requirement for Aggregate::FinalPartitioned and Aggregate::Final modes. These modes receive complete data from different partitions. Order sensitive accumulators in their merge_batch method, accumulates data from different partitions by considering ordering of each partition. Hence we do not need any outside mechanism (such as adding a Sort to the plan) for correct operation.

Are these changes tested?

Yes new tests are added

Are there any user-facing changes?

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Jul 28, 2023
//
// and checks whether merge_batch for FIRST_VALUE AND LAST_VALUE
// works correctly.
async fn first_last_multi_partitions(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unit test is written to check for whether merge_batch method for first_value and last_value works as expected.

],
)
.unwrap(),
RecordBatch::try_new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this data is used to test first_value and last_value. Data is constructed such that first and last value result is at 2nd and 3rd partition. With this construction, we guarantee that expected result received is not accident, but merging works properly (It doesn't depend on the data insertion order).

@@ -1960,21 +1960,20 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
--SortExec: expr=[col0@0 ASC NULLS LAST]
----ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS LAST]@3 as last_col1]
------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
--------SortExec: expr=[col0@3 ASC NULLS LAST]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally in the plan SortExec before AggregateExec: mode=FinalPartitioned or AggregateExec: mode=Final is removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I it makes sense that there is nothing inserted between the two aggregate phases given the aggregates themselves track the sortedness

@@ -2760,8 +2772,8 @@ Projection: FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
physical_plan
ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv2]
--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
----SortPreservingMergeExec: [ts@0 ASC NULLS LAST]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also SortPreservingMergeExec is replaced with CoalescePartitionsExec. since in final mode we no longer require any ordering for its input.

set datafusion.execution.target_partitions = 8;

query ?
SELECT ARRAY_AGG(e.rate ORDER BY e.sn)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the buggy query in the issue

@@ -228,11 +231,13 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
.iter()
.map(|sort_expr| sort_expr.options)
.collect::<Vec<_>>();
self.values = merge_ordered_arrays(
let (new_values, new_orderings) = merge_ordered_arrays(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we no longer rely on outside mechanism for merging, During merging we need to merge orderings (additional to values).

# Conflicts:
#	datafusion/physical-expr/src/aggregate/first_last.rs
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mustafasrepo -- I took a quick look at this PR and will review it more carefully tomorrow

datafusion/core/src/physical_plan/aggregates/mod.rs Outdated Show resolved Hide resolved
@@ -52,7 +52,6 @@ impl GroupOrdering {
} = ordering;

Ok(match mode {
GroupByOrderMode::None => GroupOrdering::None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't reviewed this super carefully yet, but GroupOrderingNone was substantially faster than GroupOrderingPartial, because GroupOrderingPartial requires additional comparisons per group key.

Thus I would expect this test to slow down performance. I will run some performance tests next week.

Copy link
Contributor Author

@mustafasrepo mustafasrepo Jul 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have stressed it, this change doesn't affect code path for GroupOrderingNone. I recognized that we already store aggregation as option as in link. When this mode is Option::None, it is equivalent to GroupByOrderMode::None. So I thought, there is no need to keep track of AggregationMode::None separately. As can be seen in link. When option is None, GroupOrdering::None is used. If this is misleading, I can retract this change. It is not important, for this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Sorry for my confusion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got inspired / wanted some excuse to work on the code, so I made #7150 as a potential alternate approach

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree there is a problem here and thank you for working on it @mustafasrepo

I think there may still be a problem with trying to do a mulit-phase group by for plans that require the input to be sorted. I left a comment below. Let me know what you think

--------------------MemoryExec: partitions=1, partition_sizes=[1]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8
------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this new plan is correct either as the data is not necessarily sorted for the final grouping . The old plan also seems wrong.

My reasoning is that for an aggregate like this:

FIRST_VALUE(amount ORDER BY ts ASC)

the input to the group operator has to be sorted on ts.

However, I don't see how the order by ts is preserved after the AggregateExec: mode=Partial first grouping phase

------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
 ** I think the order by `ts` is not preserved here, so the data is not ordered by ts for the final grouping **
------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]

Maybe the right solution would be to do a single phase grouping when any of the aggregates have an ORDER BY clause

AggregateExec: mode=Final, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ** no partial group by 
  SortExec: expr=[ts@1 ASC NULLS LAST]
    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
      MemoryExec: partitions=1, partition_sizes=[1]

🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider table below

amount ts
12 1
11 2
13 3
12 4
11 5
13 6
12 7
11 8
13 9

Also assume we have 3 partitions, receiving following data

amount ts
11 2
11 5
11 8
amount ts
12 1
12 4
12 7
amount ts
13 3
13 6
13 9

AggregatePartial would produce following values
(11, 2); (12, 1); (13;3) for each partition. First value represents first_value for this partitition. Second value represents its corresponding ts value. In this case
AgregateFinal would receive following batch

amount ts of amount partial result
11 2
12 1
13 3

During merge_batch method of first_value first value is calculated by considering ts values corresponding to amount for each partition. In our case, since requirement is ts ASC, first value should be from the row that have smallest ts (in our case 1). Hence result will be 12. Please note that ts at the final input and ts at the partial input doesn't correspond to same column. ts at the final aggregation input, comes from the state of aggregation partial result.
In short, we delegated responsibility to sort to merge_batch algorithm. Because, the column where sorting will be done is no longer valid for the aggregation final.

Maybe the right solution would be to do a single phase grouping when any of the aggregates have an ORDER BY clause

this would certainly work. However, I wanted to use existing parallelization as much as possible. Hence, I wanted to make aggregators to work in Partial and Final modes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I forgot that the first_value and last_value aggregators store the value of the ordering internally https://github.com/apache/arrow-datafusion/blob/504f24767486b8bf9cb08dd54b829b1654f1054f/datafusion/physical-expr/src/aggregate/first_last.rs#L155-L161

Makes sense -- I will review the rest of the changes in this PR carefully tomorrow

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me -- thank you both for the fix @mustafasrepo as well as helping me understand what was going on

datafusion/core/src/physical_plan/aggregates/mod.rs Outdated Show resolved Hide resolved
@@ -1960,21 +1960,20 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
--SortExec: expr=[col0@0 ASC NULLS LAST]
----ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS LAST]@3 as last_col1]
------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
--------SortExec: expr=[col0@3 ASC NULLS LAST]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I it makes sense that there is nothing inserted between the two aggregate phases given the aggregates themselves track the sortedness

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also went over the code carefully and it looks good to me as well. Ready for merge from my perspective after CI passes.

@mustafasrepo mustafasrepo merged commit e39b5ca into apache:main Aug 1, 2023
21 checks passed
@mustafasrepo mustafasrepo deleted the bug_fix/aggregate_cannot_sort_decimal_err branch August 2, 2023 12:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Aggregate Final Mode requires invalid columns when working in multi partitions
3 participants