Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ResolveGroupingAnalytics analyzer rule #5749

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
Expr::Literal(_)
| Expr::Alias(_, _)
| Expr::OuterReferenceColumn(_, _)
| Expr::HiddenColumn(_, _)
| Expr::HiddenExpr(_, _)
| Expr::ScalarVariable(_, _)
| Expr::Not(_)
| Expr::IsNotNull(_)
Expand Down
142 changes: 105 additions & 37 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ pub enum AggregateMode {
#[derive(Clone, Debug, Default)]
pub struct PhysicalGroupBy {
/// Distinct (Physical Expr, Alias) in the grouping set
expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
grouping_set_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
/// Hidden grouping set expr in the grouping set
hidden_grouping_set_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
/// Distinct result expr for the grouping set, used to generate output schema
result_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question.

Is result_expr a part of grouping_set_expr?
If yes, maybe we can use a index vec to point grouping_set_expr

/// Corresponding NULL expressions for expr
null_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
/// Null mask for each group in this grouping set. Each group is
Expand All @@ -99,12 +103,16 @@ pub struct PhysicalGroupBy {
impl PhysicalGroupBy {
/// Create a new `PhysicalGroupBy`
pub fn new(
expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
grouping_set_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
hidden_grouping_set_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
result_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
null_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
groups: Vec<Vec<bool>>,
) -> Self {
Self {
expr,
grouping_set_expr,
hidden_grouping_set_expr,
result_expr,
null_expr,
groups,
}
Expand All @@ -115,7 +123,9 @@ impl PhysicalGroupBy {
pub fn new_single(expr: Vec<(Arc<dyn PhysicalExpr>, String)>) -> Self {
let num_exprs = expr.len();
Self {
expr,
grouping_set_expr: expr.clone(),
hidden_grouping_set_expr: vec![],
result_expr: expr,
null_expr: vec![],
groups: vec![vec![false; num_exprs]],
}
Expand All @@ -128,22 +138,32 @@ impl PhysicalGroupBy {

/// Returns the group expressions
pub fn expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] {
&self.expr
&self.grouping_set_expr
}

/// Returns the group result expressions
pub fn result_expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] {
&self.result_expr
}

/// Returns the null expressions
pub fn null_expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] {
&self.null_expr
}

/// Returns the hidden grouping set expressions
pub fn hidden_grouping_set_expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] {
&self.hidden_grouping_set_expr
}

/// Returns the group null masks
pub fn groups(&self) -> &[Vec<bool>] {
&self.groups
}

/// Returns true if this `PhysicalGroupBy` has no group expressions
pub fn is_empty(&self) -> bool {
self.expr.is_empty()
self.grouping_set_expr.is_empty()
}
}

Expand Down Expand Up @@ -196,7 +216,7 @@ impl AggregateExec {
) -> Result<Self> {
let schema = create_schema(
&input.schema(),
&group_by.expr,
group_by.result_expr(),
&aggr_expr,
group_by.contains_null(),
mode,
Expand All @@ -205,7 +225,7 @@ impl AggregateExec {
let schema = Arc::new(schema);

let mut alias_map: HashMap<Column, Vec<Column>> = HashMap::new();
for (expression, name) in group_by.expr.iter() {
for (expression, name) in group_by.result_expr().iter() {
if let Some(column) = expression.as_any().downcast_ref::<Column>() {
let new_col_idx = schema.index_of(name)?;
// When the column name is the same, but index does not equal, treat it as Alias
Expand Down Expand Up @@ -243,7 +263,7 @@ impl AggregateExec {
// Update column indices. Since the group by columns come first in the output schema, their
// indices are simply 0..self.group_expr(len).
self.group_by
.expr()
.result_expr()
.iter()
.enumerate()
.map(|(index, (_col, name))| {
Expand Down Expand Up @@ -275,7 +295,7 @@ impl AggregateExec {
let batch_size = context.session_config().batch_size();
let input = self.input.execute(partition, Arc::clone(&context))?;
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
if self.group_by.expr.is_empty() {
if self.group_by.result_expr().is_empty() {
Ok(StreamType::AggregateStream(AggregateStream::new(
self.mode,
self.schema.clone(),
Expand Down Expand Up @@ -418,7 +438,7 @@ impl ExecutionPlan for AggregateExec {
write!(f, "AggregateExec: mode={:?}", self.mode)?;
let g: Vec<String> = if self.group_by.groups.len() == 1 {
self.group_by
.expr
.grouping_set_expr
.iter()
.map(|(e, alias)| {
let e = e.to_string();
Expand Down Expand Up @@ -447,7 +467,8 @@ impl ExecutionPlan for AggregateExec {
e
}
} else {
let (e, alias) = &self.group_by.expr[idx];
let (e, alias) =
&self.group_by.grouping_set_expr[idx];
let e = e.to_string();
if &e != alias {
format!("{e} as {alias}")
Expand Down Expand Up @@ -484,7 +505,7 @@ impl ExecutionPlan for AggregateExec {
// - aggregations somtimes also preserve invariants such as min, max...
match self.mode {
AggregateMode::Final | AggregateMode::FinalPartitioned
if self.group_by.expr.is_empty() =>
if self.group_by.result_expr().is_empty() =>
{
Statistics {
num_rows: Some(1),
Expand Down Expand Up @@ -671,16 +692,16 @@ fn evaluate_group_by(
group_by: &PhysicalGroupBy,
batch: &RecordBatch,
) -> Result<Vec<Vec<ArrayRef>>> {
let exprs: Vec<ArrayRef> = group_by
.expr
let exprs_value: Vec<ArrayRef> = group_by
.grouping_set_expr
.iter()
.map(|(expr, _)| {
let value = expr.evaluate(batch)?;
Ok(value.into_array(batch.num_rows()))
})
.collect::<Result<Vec<_>>>()?;

let null_exprs: Vec<ArrayRef> = group_by
let null_exprs_value: Vec<ArrayRef> = group_by
.null_expr
.iter()
.map(|(expr, _)| {
Expand All @@ -689,23 +710,61 @@ fn evaluate_group_by(
})
.collect::<Result<Vec<_>>>()?;

Ok(group_by
.groups
.iter()
.map(|group| {
group
.iter()
.enumerate()
.map(|(idx, is_null)| {
if *is_null {
null_exprs[idx].clone()
} else {
exprs[idx].clone()
}
})
.collect()
})
.collect())
if !group_by.hidden_grouping_set_expr().is_empty() {
let hidden_exprs_value: Vec<ArrayRef> = group_by
.hidden_grouping_set_expr
.iter()
.map(|(expr, _)| {
let value = expr.evaluate(batch)?;
Ok(value.into_array(batch.num_rows()))
})
.collect::<Result<Vec<_>>>()?;

let chunk_size = hidden_exprs_value.len() / group_by.groups.len();
let hidden_expr_value_chunks =
hidden_exprs_value.chunks(chunk_size).collect::<Vec<_>>();

Ok(group_by
.groups
.iter()
.enumerate()
.map(|(groud_id, group)| {
let mut group_data = group
.iter()
.enumerate()
.map(|(idx, is_null)| {
if *is_null {
null_exprs_value[idx].clone()
} else {
exprs_value[idx].clone()
}
})
.collect::<Vec<_>>();
Comment on lines +732 to +742
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can extract a function named get_group_data.

for data in hidden_expr_value_chunks[groud_id] {
group_data.push(data.clone());
}
group_data
})
.collect())
} else {
Ok(group_by
.groups
.iter()
.map(|group| {
group
.iter()
.enumerate()
.map(|(idx, is_null)| {
if *is_null {
null_exprs_value[idx].clone()
} else {
exprs_value[idx].clone()
}
})
.collect::<Vec<_>>()
Comment on lines +754 to +764
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use function get_group_data

})
.collect())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -775,7 +834,12 @@ mod tests {
let input_schema = input.schema();

let grouping_set = PhysicalGroupBy {
expr: vec![
grouping_set_expr: vec![
(col("a", &input_schema)?, "a".to_string()),
(col("b", &input_schema)?, "b".to_string()),
],
hidden_grouping_set_expr: vec![],
result_expr: vec![
(col("a", &input_schema)?, "a".to_string()),
(col("b", &input_schema)?, "b".to_string()),
],
Expand Down Expand Up @@ -890,9 +954,11 @@ mod tests {
let input_schema = input.schema();

let grouping_set = PhysicalGroupBy {
expr: vec![(col("a", &input_schema)?, "a".to_string())],
grouping_set_expr: vec![(col("a", &input_schema)?, "a".to_string())],
hidden_grouping_set_expr: vec![],
null_expr: vec![],
groups: vec![vec![false]],
result_expr: vec![(col("a", &input_schema)?, "a".to_string())],
};

let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
Expand Down Expand Up @@ -929,7 +995,7 @@ mod tests {
let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate));

let final_group: Vec<(Arc<dyn PhysicalExpr>, String)> = grouping_set
.expr
.result_expr()
.iter()
.map(|(_expr, name)| Ok((col(name, &input_schema)?, name.clone())))
.collect::<Result<_>>()?;
Expand Down Expand Up @@ -1119,9 +1185,11 @@ mod tests {

let groups_none = PhysicalGroupBy::default();
let groups_some = PhysicalGroupBy {
expr: vec![(col("a", &input_schema)?, "a".to_string())],
grouping_set_expr: vec![(col("a", &input_schema)?, "a".to_string())],
hidden_grouping_set_expr: vec![],
null_expr: vec![],
groups: vec![vec![false]],
result_expr: vec![(col("a", &input_schema)?, "a".to_string())],
};

// something that allocates within the aggregator
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl GroupedHashAggregateStream {
) -> Result<Self> {
let timer = baseline_metrics.elapsed_compute().timer();

let mut start_idx = group_by.expr.len();
let mut start_idx = group_by.result_expr().len();
let mut row_aggr_expr = vec![];
let mut row_agg_indices = vec![];
let mut row_aggregate_expressions = vec![];
Expand Down Expand Up @@ -175,7 +175,8 @@ impl GroupedHashAggregateStream {

let row_aggr_schema = aggr_state_schema(&row_aggr_expr)?;

let group_schema = group_schema(&schema, group_by.expr.len());
let group_schema = group_schema(&schema, group_by.result_expr().len());

let row_converter = RowConverter::new(
group_schema
.fields()
Expand Down
Loading