Skip to content

Commit

Permalink
Infer count() aggregation is not null
Browse files Browse the repository at this point in the history
`count([DISTINCT] [expr])` aggregate function never returns null. Infer
non-nullness of such aggregate expression. This allows elimination of
the HAVING filter for a query such as

    SELECT ... count(*) AS c
    FROM ...
    GROUP BY ...
    HAVING c IS NOT NULL
  • Loading branch information
findepi committed Jul 3, 2024
1 parent d44c7f2 commit 7328c38
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 0 deletions.
130 changes: 130 additions & 0 deletions datafusion/optimizer/src/infer_non_null.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! [`InferNonNull`] infers which columns are non-nullable

use std::collections::HashSet;
use std::ops::Deref;
use std::sync::Arc;

use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::tree_node::Transformed;
use datafusion_common::{DFSchema, Result};
use datafusion_expr::expr::AggregateFunction;
use datafusion_expr::{Aggregate, Expr, LogicalPlan};

#[derive(Default)]
pub struct InferNonNull {}

impl InferNonNull {
pub fn new() -> Self {
Self::default()
}
}

impl OptimizerRule for InferNonNull {
fn name(&self) -> &str {
"infer_non_null"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
if let LogicalPlan::Aggregate(ref aggregate) = plan {
let grouping_columns = aggregate.group_expr_len()?;
let new_non_null_fields: HashSet<_> = aggregate
.aggr_expr
.iter()
.enumerate()
.map(|(i, expr)| {
let field_index = grouping_columns + i;
if !plan.schema().field(field_index).is_nullable() {
// Already not nullable.
return None;
}
simple_aggregate_function(expr)
.filter(|function| {
function.func_def.name() == "count"
&& function.args.len() <= 1
})
.map(|_| field_index)
})
.flat_map(|x| x)
.collect();

if !new_non_null_fields.is_empty() {
let new_schema = Arc::new(DFSchema::new_with_metadata(
plan.schema()
.iter()
.enumerate()
.map(|(i, field)| {
let mut field = (field.0.cloned(), field.1.clone());
if new_non_null_fields.contains(&i) {
field = (
field.0,
Arc::new(
field.1.deref().clone().with_nullable(false),
),
);
}
field
})
.collect(),
plan.schema().metadata().clone(),
)?);

return Ok(Transformed::yes(LogicalPlan::Aggregate(
Aggregate::try_new_with_schema(
aggregate.input.clone(),
aggregate.group_expr.clone(),
aggregate.aggr_expr.clone(),
new_schema,
)?,
)));
}
}

if let LogicalPlan::Window(ref window) = plan {
// TODO similar to Aggregate
}

if let LogicalPlan::Filter(ref filter) = plan {
// TODO infer column being not null from filter predicates
}

Ok(Transformed::no(plan))
}
}

fn simple_aggregate_function(expr: &Expr) -> Option<&AggregateFunction> {
match expr {
Expr::AggregateFunction(ref aggregate_function) => Some(aggregate_function),
Expr::Alias(ref alias) => simple_aggregate_function(alias.expr.as_ref()),
_ => None,
}
}
1 change: 1 addition & 0 deletions datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub mod eliminate_one_union;
pub mod eliminate_outer_join;
pub mod extract_equijoin_predicate;
pub mod filter_null_join_keys;
pub mod infer_non_null;
pub mod optimize_projections;
pub mod optimizer;
pub mod propagate_empty_relation;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::eliminate_one_union::EliminateOneUnion;
use crate::eliminate_outer_join::EliminateOuterJoin;
use crate::extract_equijoin_predicate::ExtractEquijoinPredicate;
use crate::filter_null_join_keys::FilterNullJoinKeys;
use crate::infer_non_null::InferNonNull;
use crate::optimize_projections::OptimizeProjections;
use crate::plan_signature::LogicalPlanSignature;
use crate::propagate_empty_relation::PropagateEmptyRelation;
Expand Down Expand Up @@ -245,6 +246,7 @@ impl Optimizer {
Arc::new(EliminateNestedUnion::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(InferNonNull::new()),
Arc::new(ReplaceDistinctWithAggregate::new()),
Arc::new(EliminateJoin::new()),
Arc::new(DecorrelatePredicateSubquery::new()),
Expand Down

0 comments on commit 7328c38

Please sign in to comment.