Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate Function Migration #12861

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion datafusion/functions-aggregate/src/approx_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@ use datafusion_common::ScalarValue;
use datafusion_common::{
downcast_value, internal_err, not_impl_err, DataFusionError, Result,
};
use datafusion_expr::aggregate_doc_sections::DOC_SECTION_APPROXIMATE;
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility};
use datafusion_expr::{
Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility,
};
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::OnceLock;
make_udaf_expr_and_func!(
ApproxDistinct,
approx_distinct,
Expand Down Expand Up @@ -303,4 +307,33 @@ impl AggregateUDFImpl for ApproxDistinct {
};
Ok(accumulator)
}

fn documentation(&self) -> Option<&Documentation> {
Some(get_approx_distinct_doc())
}
}

static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();

fn get_approx_distinct_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_APPROXIMATE)
.with_description(
"Returns the approximate number of distinct input values calculated using the HyperLogLog algorithm.",
)
.with_syntax_example("approx_distinct(expression)")
.with_sql_example(r#"```sql
> SELECT approx_distinct(column_name) FROM table_name;
+-----------------------------------+
| approx_distinct(column_name) |
+-----------------------------------+
| 42 |
+-----------------------------------+
```"#,
)
.with_argument("expression", "Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.")
.build()
.unwrap()
})
}
35 changes: 34 additions & 1 deletion datafusion/functions-aggregate/src/approx_median.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

use std::any::Any;
use std::fmt::Debug;
use std::sync::OnceLock;

use arrow::{datatypes::DataType, datatypes::Field};
use arrow_schema::DataType::{Float64, UInt64};

use datafusion_common::{not_impl_err, plan_err, Result};
use datafusion_expr::aggregate_doc_sections::DOC_SECTION_APPROXIMATE;
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility};
use datafusion_expr::{
Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility,
};

use crate::approx_percentile_cont::ApproxPercentileAccumulator;

Expand Down Expand Up @@ -116,4 +120,33 @@ impl AggregateUDFImpl for ApproxMedian {
acc_args.exprs[0].data_type(acc_args.schema)?,
)))
}

fn documentation(&self) -> Option<&Documentation> {
Some(get_approx_median_doc())
}
}

static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();

fn get_approx_median_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_APPROXIMATE)
.with_description(
"Returns the approximate median (50th percentile) of input values. It is an alias of `approx_percentile_cont(x, 0.5)`.",
)
.with_syntax_example("approx_median(expression)")
.with_sql_example(r#"```sql
> SELECT approx_median(column_name) FROM table_name;
+-----------------------------------+
| approx_median(column_name) |
+-----------------------------------+
| 23.5 |
+-----------------------------------+
```"#,
)
.with_argument("expression", "Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.")
.build()
.unwrap()
})
}
37 changes: 34 additions & 3 deletions datafusion/functions-aggregate/src/approx_percentile_cont.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use arrow::array::{Array, RecordBatch};
use arrow::compute::{filter, is_not_null};
Expand All @@ -34,12 +34,13 @@ use datafusion_common::{
downcast_value, internal_err, not_impl_datafusion_err, not_impl_err, plan_err,
DataFusionError, Result, ScalarValue,
};
use datafusion_expr::aggregate_doc_sections::DOC_SECTION_APPROXIMATE;
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::type_coercion::aggregates::{INTEGERS, NUMERICS};
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{
Accumulator, AggregateUDFImpl, ColumnarValue, Expr, Signature, TypeSignature,
Volatility,
Accumulator, AggregateUDFImpl, ColumnarValue, Documentation, Expr, Signature,
TypeSignature, Volatility,
};
use datafusion_functions_aggregate_common::tdigest::{
TDigest, TryIntoF64, DEFAULT_MAX_SIZE,
Expand Down Expand Up @@ -268,6 +269,36 @@ impl AggregateUDFImpl for ApproxPercentileCont {
}
Ok(arg_types[0].clone())
}

fn documentation(&self) -> Option<&Documentation> {
Some(get_approx_percentile_cont_doc())
}
}

static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();

fn get_approx_percentile_cont_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_APPROXIMATE)
.with_description(
"Returns the approximate percentile of input values using the t-digest algorithm.",
)
.with_syntax_example("approx_percentile_cont(expression, percentile, centroids)")
.with_sql_example(r#"```sql
> SELECT approx_percentile_cont(column_name, 0.75, 100) FROM table_name;
+-------------------------------------------------+
| approx_percentile_cont(column_name, 0.75, 100) |
+-------------------------------------------------+
| 65.0 |
+-------------------------------------------------+
```"#)
.with_argument("expression", "Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.")
.with_argument("percentile", "Percentile to compute. Must be a float value between 0 and 1 (inclusive).")
.with_argument("centroids", "Number of centroids to use in the t-digest algorithm. _Default is 100_. A higher number results in more accurate approximation but requires more memory.")
.build()
.unwrap()
})
}

#[derive(Debug)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use arrow::{
array::ArrayRef,
Expand All @@ -26,10 +26,13 @@ use arrow::{

use datafusion_common::ScalarValue;
use datafusion_common::{not_impl_err, plan_err, Result};
use datafusion_expr::aggregate_doc_sections::DOC_SECTION_APPROXIMATE;
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::type_coercion::aggregates::NUMERICS;
use datafusion_expr::Volatility::Immutable;
use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, TypeSignature};
use datafusion_expr::{
Accumulator, AggregateUDFImpl, Documentation, Signature, TypeSignature,
};
use datafusion_functions_aggregate_common::tdigest::{
Centroid, TDigest, DEFAULT_MAX_SIZE,
};
Expand Down Expand Up @@ -151,6 +154,37 @@ impl AggregateUDFImpl for ApproxPercentileContWithWeight {
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
self.approx_percentile_cont.state_fields(args)
}

fn documentation(&self) -> Option<&Documentation> {
Some(get_approx_percentile_cont_with_weight_doc())
}
}

static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();

fn get_approx_percentile_cont_with_weight_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_APPROXIMATE)
.with_description(
"Returns the weighted approximate percentile of input values using the t-digest algorithm.",
)
.with_syntax_example("approx_percentile_cont_with_weight(expression, weight, percentile)")
.with_sql_example(r#"```sql
> SELECT approx_percentile_cont_with_weight(column_name, weight_column, 0.90) FROM table_name;
+----------------------------------------------------------------------+
| approx_percentile_cont_with_weight(column_name, weight_column, 0.90) |
+----------------------------------------------------------------------+
| 78.5 |
+----------------------------------------------------------------------+
```"#,
)
.with_argument("expression", "Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.")
.with_argument("weight", "Expression to use as weight. Can be a constant, column, or function, and any combination of arithmetic operators.")
.with_argument("percentile", "Percentile to compute. Must be a float value between 0 and 1 (inclusive).")
.build()
.unwrap()
})
}

#[derive(Debug)]
Expand Down
34 changes: 32 additions & 2 deletions datafusion/functions-aggregate/src/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ use datafusion_common::cast::as_list_array;
use datafusion_common::utils::{array_into_list_array_nullable, get_row_at_idx};
use datafusion_common::{exec_err, ScalarValue};
use datafusion_common::{internal_err, Result};
use datafusion_expr::aggregate_doc_sections::DOC_SECTION_GENERAL;
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::utils::format_state_name;
use datafusion_expr::AggregateUDFImpl;
use datafusion_expr::{Accumulator, Signature, Volatility};
use datafusion_expr::{AggregateUDFImpl, Documentation};
use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays;
use datafusion_functions_aggregate_common::utils::ordering_fields;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

make_udaf_expr_and_func!(
ArrayAgg,
Expand Down Expand Up @@ -142,6 +143,35 @@ impl AggregateUDFImpl for ArrayAgg {
fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF {
datafusion_expr::ReversedUDAF::Reversed(array_agg_udaf())
}

fn documentation(&self) -> Option<&Documentation> {
Some(get_array_agg_doc())
}
}

static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();

fn get_array_agg_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_GENERAL)
.with_description(
"Returns an array created from the expression elements. If ordering is required, elements are inserted in the specified order.",
)
.with_syntax_example("array_agg(expression [ORDER BY expression])")
.with_sql_example(r#"```sql
> SELECT array_agg(column_name ORDER BY other_column) FROM table_name;
+-----------------------------------------------+
| array_agg(column_name ORDER BY other_column) |
+-----------------------------------------------+
| [element1, element2, element3] |
+-----------------------------------------------+
```"#,
)
.with_argument("expression", "Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.")
.build()
.unwrap()
})
}

#[derive(Debug)]
Expand Down
36 changes: 34 additions & 2 deletions datafusion/functions-aggregate/src/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ use arrow::datatypes::{
Float64Type, UInt64Type,
};
use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue};
use datafusion_expr::aggregate_doc_sections::DOC_SECTION_GENERAL;
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::type_coercion::aggregates::{avg_return_type, coerce_avg_type};
use datafusion_expr::utils::format_state_name;
use datafusion_expr::Volatility::Immutable;
use datafusion_expr::{
Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature,
Accumulator, AggregateUDFImpl, Documentation, EmitTo, GroupsAccumulator,
ReversedUDAF, Signature,
};

use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState;
Expand All @@ -45,7 +47,7 @@ use datafusion_functions_aggregate_common::utils::DecimalAverager;
use log::debug;
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

make_udaf_expr_and_func!(
Avg,
Expand Down Expand Up @@ -235,6 +237,36 @@ impl AggregateUDFImpl for Avg {
}
coerce_avg_type(self.name(), arg_types)
}

fn documentation(&self) -> Option<&Documentation> {
Some(get_avg_doc())
}
}

static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();

fn get_avg_doc() -> &'static Documentation {
DOCUMENTATION.get_or_init(|| {
Documentation::builder()
.with_doc_section(DOC_SECTION_GENERAL)
.with_description(
"Returns the average of numeric values in the specified column.",
)
.with_syntax_example("avg(expression)")
.with_sql_example(r#"```sql
> SELECT avg(column_name) FROM table_name;
+---------------------------+
| avg(column_name) |
+---------------------------+
| 42.75 |
+---------------------------+
```"#,
)
.with_argument("expression", "Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators.")
.with_argument("Aliases: ", "`mean`")
.build()
.unwrap()
})
}

/// An accumulator to compute the average
Expand Down
Loading