From 54df9581ed4dd8591259b4a9a0b6e4230e84a4d2 Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 19 Jan 2024 16:17:31 -0800 Subject: [PATCH 1/2] Simplify windows builtin functions --- datafusion/core/src/physical_planner.rs | 25 ++++--- .../core/tests/fuzz_cases/window_fuzz.rs | 41 +++++++++++- .../expr/src/built_in_window_function.rs | 4 +- .../physical-expr/src/window/cume_dist.rs | 13 ++-- datafusion/physical-expr/src/window/ntile.rs | 12 ++-- datafusion/physical-expr/src/window/rank.rs | 22 +++---- .../physical-expr/src/window/row_number.rs | 15 +++-- datafusion/physical-plan/src/windows/mod.rs | 42 +++++++----- .../tests/cases/roundtrip_physical_plan.rs | 3 +- datafusion/sqllogictest/test_files/window.slt | 66 +++++++++++++++++++ 10 files changed, 180 insertions(+), 63 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 98390ac271d0..bc4715da31ea 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -86,6 +86,7 @@ use datafusion_expr::expr::{ }; use datafusion_expr::expr_rewriter::unnormalize_cols; use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary; +use datafusion_expr::utils::exprlist_to_fields; use datafusion_expr::{ DescribeTable, DmlStatement, ScalarFunctionDefinition, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, @@ -719,14 +720,16 @@ impl DefaultPhysicalPlanner { } let logical_input_schema = input.schema(); - let physical_input_schema = input_exec.schema(); + // Extend the schema to include window expression fields as builtin window functions derives its datatype from incoming schema + let mut window_fields = logical_input_schema.fields().clone(); + window_fields.extend_from_slice(&exprlist_to_fields(window_expr.iter(), input)?); + let extended_schema = &DFSchema::new_with_metadata(window_fields, HashMap::new())?; let window_expr = window_expr .iter() .map(|e| { create_window_expr( e, - logical_input_schema, - &physical_input_schema, + extended_schema, session_state.execution_props(), ) }) @@ -1526,7 +1529,7 @@ fn get_physical_expr_pair( /// queries like: /// OVER (ORDER BY a RANGES BETWEEN 3 PRECEDING AND 5 PRECEDING) /// OVER (ORDER BY a RANGES BETWEEN INTERVAL '3 DAY' PRECEDING AND '5 DAY' PRECEDING) are rejected -pub fn is_window_valid(window_frame: &WindowFrame) -> bool { +pub fn is_window_frame_bound_valid(window_frame: &WindowFrame) -> bool { match (&window_frame.start_bound, &window_frame.end_bound) { (WindowFrameBound::Following(_), WindowFrameBound::Preceding(_)) | (WindowFrameBound::Following(_), WindowFrameBound::CurrentRow) @@ -1546,10 +1549,10 @@ pub fn create_window_expr_with_name( e: &Expr, name: impl Into, logical_input_schema: &DFSchema, - physical_input_schema: &Schema, execution_props: &ExecutionProps, ) -> Result> { let name = name.into(); + let physical_input_schema: &Schema = &logical_input_schema.into(); match e { Expr::WindowFunction(WindowFunction { fun, @@ -1572,7 +1575,8 @@ pub fn create_window_expr_with_name( create_physical_sort_expr(e, logical_input_schema, execution_props) }) .collect::>>()?; - if !is_window_valid(window_frame) { + + if !is_window_frame_bound_valid(window_frame) { return plan_err!( "Invalid window frame: start bound ({}) cannot be larger than end bound ({})", window_frame.start_bound, window_frame.end_bound @@ -1598,7 +1602,6 @@ pub fn create_window_expr_with_name( pub fn create_window_expr( e: &Expr, logical_input_schema: &DFSchema, - physical_input_schema: &Schema, execution_props: &ExecutionProps, ) -> Result> { // unpack aliased logical expressions, e.g. "sum(col) over () as total" @@ -1606,13 +1609,7 @@ pub fn create_window_expr( Expr::Alias(Alias { expr, name, .. }) => (name.clone(), expr.as_ref()), _ => (e.display_name()?, e), }; - create_window_expr_with_name( - e, - name, - logical_input_schema, - physical_input_schema, - execution_props, - ) + create_window_expr_with_name(e, name, logical_input_schema, execution_props) } type AggregateExprWithOptionalArgs = ( diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index 6e5c5f8eb95e..4c440d6a5bfd 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -22,6 +22,7 @@ use arrow::compute::{concat_batches, SortOptions}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; +use arrow_schema::{Field, Schema}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::windows::{ @@ -37,6 +38,7 @@ use datafusion_expr::{ }; use datafusion_physical_expr::expressions::{cast, col, lit}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; +use itertools::Itertools; use test_utils::add_empty_batches; use hashbrown::HashMap; @@ -482,7 +484,6 @@ async fn run_window_test( let session_config = SessionConfig::new().with_batch_size(50); let ctx = SessionContext::new_with_config(session_config); let (window_fn, args, fn_name) = get_random_function(&schema, &mut rng, is_linear); - let window_frame = get_random_window_frame(&mut rng, is_linear); let mut orderby_exprs = vec![]; for column in &orderby_columns { @@ -532,6 +533,40 @@ async fn run_window_test( if is_linear { exec1 = Arc::new(SortExec::new(sort_keys.clone(), exec1)) as _; } + + // The schema needs to be enriched before the `create_window_expr` + // The reason for this is window expressions datatypes are derived from the schema + // The datafusion code enriches the schema on physical planner and this test copies the same behavior manually + // Also bunch of functions dont require input arguments thus just send an empty vec for such functions + let data_types = if [ + "row_number", + "rank", + "dense_rank", + "percent_rank", + "ntile", + "cume_dist", + ] + .contains(&fn_name.as_str()) + { + vec![] + } else { + args.iter() + .map(|e| e.clone().as_ref().data_type(&schema)) + .collect::>>()? + }; + let window_expr_return_type = window_fn.return_type(&data_types)?; + let mut window_fields = schema + .fields() + .iter() + .map(|f| f.as_ref().clone()) + .collect_vec(); + window_fields.extend_from_slice(&[Field::new( + &fn_name, + window_expr_return_type, + true, + )]); + let extended_schema = Arc::new(Schema::new(window_fields)); + let usual_window_exec = Arc::new( WindowAggExec::try_new( vec![create_window_expr( @@ -541,7 +576,7 @@ async fn run_window_test( &partitionby_exprs, &orderby_exprs, Arc::new(window_frame.clone()), - schema.as_ref(), + &extended_schema, ) .unwrap()], exec1, @@ -563,7 +598,7 @@ async fn run_window_test( &partitionby_exprs, &orderby_exprs, Arc::new(window_frame.clone()), - schema.as_ref(), + extended_schema.as_ref(), ) .unwrap()], exec2, diff --git a/datafusion/expr/src/built_in_window_function.rs b/datafusion/expr/src/built_in_window_function.rs index a03e3d2d24a9..f4b1cd03db1f 100644 --- a/datafusion/expr/src/built_in_window_function.rs +++ b/datafusion/expr/src/built_in_window_function.rs @@ -133,11 +133,11 @@ impl BuiltInWindowFunction { match self { BuiltInWindowFunction::RowNumber | BuiltInWindowFunction::Rank - | BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64), + | BuiltInWindowFunction::DenseRank + | BuiltInWindowFunction::Ntile => Ok(DataType::UInt64), BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => { Ok(DataType::Float64) } - BuiltInWindowFunction::Ntile => Ok(DataType::UInt64), BuiltInWindowFunction::Lag | BuiltInWindowFunction::Lead | BuiltInWindowFunction::FirstValue diff --git a/datafusion/physical-expr/src/window/cume_dist.rs b/datafusion/physical-expr/src/window/cume_dist.rs index edef77c51c31..35f7311c441d 100644 --- a/datafusion/physical-expr/src/window/cume_dist.rs +++ b/datafusion/physical-expr/src/window/cume_dist.rs @@ -34,11 +34,15 @@ use std::sync::Arc; #[derive(Debug)] pub struct CumeDist { name: String, + data_type: DataType, } /// Create a cume_dist window function -pub fn cume_dist(name: String) -> CumeDist { - CumeDist { name } +pub fn cume_dist(name: String, data_type: &DataType) -> CumeDist { + CumeDist { + name, + data_type: data_type.clone(), + } } impl BuiltInWindowFunctionExpr for CumeDist { @@ -49,8 +53,7 @@ impl BuiltInWindowFunctionExpr for CumeDist { fn field(&self) -> Result { let nullable = false; - let data_type = DataType::Float64; - Ok(Field::new(self.name(), data_type, nullable)) + Ok(Field::new(self.name(), self.data_type.clone(), nullable)) } fn expressions(&self) -> Vec> { @@ -119,7 +122,7 @@ mod tests { #[test] #[allow(clippy::single_range_in_vec_init)] fn test_cume_dist() -> Result<()> { - let r = cume_dist("arr".into()); + let r = cume_dist("arr".into(), &DataType::Float64); let expected = vec![0.0; 0]; test_i32_result(&r, 0, vec![], expected)?; diff --git a/datafusion/physical-expr/src/window/ntile.rs b/datafusion/physical-expr/src/window/ntile.rs index f5442e1b0fee..f6ab3a27201f 100644 --- a/datafusion/physical-expr/src/window/ntile.rs +++ b/datafusion/physical-expr/src/window/ntile.rs @@ -35,11 +35,16 @@ use std::sync::Arc; pub struct Ntile { name: String, n: u64, + data_type: DataType, } impl Ntile { - pub fn new(name: String, n: u64) -> Self { - Self { name, n } + pub fn new(name: String, n: u64, data_type: &DataType) -> Self { + Self { + name, + n, + data_type: data_type.clone(), + } } pub fn get_n(&self) -> u64 { @@ -54,8 +59,7 @@ impl BuiltInWindowFunctionExpr for Ntile { fn field(&self) -> Result { let nullable = false; - let data_type = DataType::UInt64; - Ok(Field::new(self.name(), data_type, nullable)) + Ok(Field::new(self.name(), self.data_type.clone(), nullable)) } fn expressions(&self) -> Vec> { diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs index 86af5b322133..351c5811c15f 100644 --- a/datafusion/physical-expr/src/window/rank.rs +++ b/datafusion/physical-expr/src/window/rank.rs @@ -41,6 +41,7 @@ use std::sync::Arc; pub struct Rank { name: String, rank_type: RankType, + data_type: DataType, } impl Rank { @@ -58,26 +59,29 @@ pub enum RankType { } /// Create a rank window function -pub fn rank(name: String) -> Rank { +pub fn rank(name: String, data_type: &DataType) -> Rank { Rank { name, rank_type: RankType::Basic, + data_type: data_type.clone(), } } /// Create a dense rank window function -pub fn dense_rank(name: String) -> Rank { +pub fn dense_rank(name: String, data_type: &DataType) -> Rank { Rank { name, rank_type: RankType::Dense, + data_type: data_type.clone(), } } /// Create a percent rank window function -pub fn percent_rank(name: String) -> Rank { +pub fn percent_rank(name: String, data_type: &DataType) -> Rank { Rank { name, rank_type: RankType::Percent, + data_type: data_type.clone(), } } @@ -89,11 +93,7 @@ impl BuiltInWindowFunctionExpr for Rank { fn field(&self) -> Result { let nullable = false; - let data_type = match self.rank_type { - RankType::Basic | RankType::Dense => DataType::UInt64, - RankType::Percent => DataType::Float64, - }; - Ok(Field::new(self.name(), data_type, nullable)) + Ok(Field::new(self.name(), self.data_type.clone(), nullable)) } fn expressions(&self) -> Vec> { @@ -268,7 +268,7 @@ mod tests { #[test] fn test_dense_rank() -> Result<()> { - let r = dense_rank("arr".into()); + let r = dense_rank("arr".into(), &DataType::UInt64); test_without_rank(&r, vec![1; 8])?; test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?; Ok(()) @@ -276,7 +276,7 @@ mod tests { #[test] fn test_rank() -> Result<()> { - let r = rank("arr".into()); + let r = rank("arr".into(), &DataType::UInt64); test_without_rank(&r, vec![1; 8])?; test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?; Ok(()) @@ -285,7 +285,7 @@ mod tests { #[test] #[allow(clippy::single_range_in_vec_init)] fn test_percent_rank() -> Result<()> { - let r = percent_rank("arr".into()); + let r = percent_rank("arr".into(), &DataType::Float64); // empty case let expected = vec![0.0; 0]; diff --git a/datafusion/physical-expr/src/window/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs index f5e2f65a656e..1826707b31c6 100644 --- a/datafusion/physical-expr/src/window/row_number.rs +++ b/datafusion/physical-expr/src/window/row_number.rs @@ -36,12 +36,16 @@ use std::sync::Arc; #[derive(Debug)] pub struct RowNumber { name: String, + data_type: DataType, } impl RowNumber { /// Create a new ROW_NUMBER function - pub fn new(name: impl Into) -> Self { - Self { name: name.into() } + pub fn new(name: impl Into, data_type: &DataType) -> Self { + Self { + name: name.into(), + data_type: data_type.clone(), + } } } @@ -53,8 +57,7 @@ impl BuiltInWindowFunctionExpr for RowNumber { fn field(&self) -> Result { let nullable = false; - let data_type = DataType::UInt64; - Ok(Field::new(self.name(), data_type, nullable)) + Ok(Field::new(self.name(), self.data_type.clone(), nullable)) } fn expressions(&self) -> Vec> { @@ -127,7 +130,7 @@ mod tests { ])); let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, true)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?; - let row_number = RowNumber::new("row_number".to_owned()); + let row_number = RowNumber::new("row_number".to_owned(), &DataType::UInt64); let values = row_number.evaluate_args(&batch)?; let result = row_number .create_evaluator()? @@ -145,7 +148,7 @@ mod tests { ])); let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?; - let row_number = RowNumber::new("row_number".to_owned()); + let row_number = RowNumber::new("row_number".to_owned(), &DataType::UInt64); let values = row_number.evaluate_args(&batch)?; let result = row_number .create_evaluator()? diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index a85e5cc31c58..e55cc7fca7a6 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -160,12 +160,13 @@ fn create_built_in_window_expr( input_schema: &Schema, name: String, ) -> Result> { + let data_type = input_schema.field_with_name(&name)?.data_type(); Ok(match fun { - BuiltInWindowFunction::RowNumber => Arc::new(RowNumber::new(name)), - BuiltInWindowFunction::Rank => Arc::new(rank(name)), - BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name)), - BuiltInWindowFunction::PercentRank => Arc::new(percent_rank(name)), - BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name)), + BuiltInWindowFunction::RowNumber => Arc::new(RowNumber::new(name, data_type)), + BuiltInWindowFunction::Rank => Arc::new(rank(name, data_type)), + BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name, data_type)), + BuiltInWindowFunction::PercentRank => Arc::new(percent_rank(name, data_type)), + BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name, data_type)), BuiltInWindowFunction::Ntile => { let n = get_scalar_value_from_args(args, 0)?.ok_or_else(|| { DataFusionError::Execution( @@ -179,32 +180,42 @@ fn create_built_in_window_expr( if n.is_unsigned() { let n: u64 = n.try_into()?; - Arc::new(Ntile::new(name, n)) + Arc::new(Ntile::new(name, n, data_type)) } else { let n: i64 = n.try_into()?; if n <= 0 { return exec_err!("NTILE requires a positive integer"); } - Arc::new(Ntile::new(name, n as u64)) + Arc::new(Ntile::new(name, n as u64, data_type)) } } BuiltInWindowFunction::Lag => { let arg = args[0].clone(); - let data_type = args[0].data_type(input_schema)?; let shift_offset = get_scalar_value_from_args(args, 1)? .map(|v| v.try_into()) .and_then(|v| v.ok()); let default_value = get_scalar_value_from_args(args, 2)?; - Arc::new(lag(name, data_type, arg, shift_offset, default_value)) + Arc::new(lag( + name, + data_type.clone(), + arg, + shift_offset, + default_value, + )) } BuiltInWindowFunction::Lead => { let arg = args[0].clone(); - let data_type = args[0].data_type(input_schema)?; let shift_offset = get_scalar_value_from_args(args, 1)? .map(|v| v.try_into()) .and_then(|v| v.ok()); let default_value = get_scalar_value_from_args(args, 2)?; - Arc::new(lead(name, data_type, arg, shift_offset, default_value)) + Arc::new(lead( + name, + data_type.clone(), + arg, + shift_offset, + default_value, + )) } BuiltInWindowFunction::NthValue => { let arg = args[0].clone(); @@ -214,18 +225,15 @@ fn create_built_in_window_expr( .try_into() .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; let n: u32 = n as u32; - let data_type = args[0].data_type(input_schema)?; - Arc::new(NthValue::nth(name, arg, data_type, n)?) + Arc::new(NthValue::nth(name, arg, data_type.clone(), n)?) } BuiltInWindowFunction::FirstValue => { let arg = args[0].clone(); - let data_type = args[0].data_type(input_schema)?; - Arc::new(NthValue::first(name, arg, data_type)) + Arc::new(NthValue::first(name, arg, data_type.clone())) } BuiltInWindowFunction::LastValue => { let arg = args[0].clone(); - let data_type = args[0].data_type(input_schema)?; - Arc::new(NthValue::last(name, arg, data_type)) + Arc::new(NthValue::last(name, arg, data_type.clone())) } }) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 3a13dc887f0c..8e0f75ce7d11 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -253,7 +253,8 @@ fn roundtrip_nested_loop_join() -> Result<()> { fn roundtrip_window() -> Result<()> { let field_a = Field::new("a", DataType::Int64, false); let field_b = Field::new("b", DataType::Int64, false); - let schema = Arc::new(Schema::new(vec![field_a, field_b])); + let field_c = Field::new("FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", DataType::Int64, false); + let schema = Arc::new(Schema::new(vec![field_a, field_b, field_c])); let window_frame = WindowFrame::new_bounds( datafusion_expr::WindowFrameUnits::Range, diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index f8337e21d703..f6d8a1ce8fff 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3906,3 +3906,69 @@ ProjectionExec: expr=[sn@0 as sn, ts@1 as ts, currency@2 as currency, amount@3 a --BoundedWindowAggExec: wdw=[SUM(table_with_pk.amount) ORDER BY [table_with_pk.sn ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(table_with_pk.amount) ORDER BY [table_with_pk.sn ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow }], mode=[Sorted] ----SortExec: expr=[sn@0 ASC NULLS LAST] ------MemoryExec: partitions=1, partition_sizes=[1] + +# test ROW_NUMBER window function returns correct data_type +query T +select arrow_typeof(row_number() over ()) from (select 1 a) +---- +UInt64 + +# test RANK window function returns correct data_type +query T +select arrow_typeof(rank() over ()) from (select 1 a) +---- +UInt64 + +# test DENSE_RANK window function returns correct data_type +query T +select arrow_typeof(dense_rank() over ()) from (select 1 a) +---- +UInt64 + +# test PERCENT_RANK window function returns correct data_type +query T +select arrow_typeof(percent_rank() over ()) from (select 1 a) +---- +Float64 + +# test CUME_DIST window function returns correct data_type +query T +select arrow_typeof(cume_dist() over ()) from (select 1 a) +---- +Float64 + +# test NTILE window function returns correct data_type +query T +select arrow_typeof(ntile(1) over ()) from (select 1 a) +---- +UInt64 + +# test LAG window function returns correct data_type +query T +select arrow_typeof(lag(a) over ()) from (select 1 a) +---- +Int64 + +# test LEAD window function returns correct data_type +query T +select arrow_typeof(lead(a) over ()) from (select 1 a) +---- +Int64 + +# test FIRST_VALUE window function returns correct data_type +query T +select arrow_typeof(first_value(a) over ()) from (select 1 a) +---- +Int64 + +# test LAST_VALUE window function returns correct data_type +query T +select arrow_typeof(last_value(a) over ()) from (select 1 a) +---- +Int64 + +# test NTH_VALUE window function returns correct data_type +query T +select arrow_typeof(nth_value(a, 1) over ()) from (select 1 a) +---- +Int64 \ No newline at end of file From 91cd25eea2d00827de8a1b32019f6c3e2d8a06a2 Mon Sep 17 00:00:00 2001 From: comphead Date: Sun, 21 Jan 2024 10:47:00 -0800 Subject: [PATCH 2/2] add field comments --- datafusion/physical-expr/src/window/cume_dist.rs | 1 + datafusion/physical-expr/src/window/lead_lag.rs | 1 + datafusion/physical-expr/src/window/nth_value.rs | 1 + datafusion/physical-expr/src/window/ntile.rs | 1 + datafusion/physical-expr/src/window/rank.rs | 1 + datafusion/physical-expr/src/window/row_number.rs | 1 + 6 files changed, 6 insertions(+) diff --git a/datafusion/physical-expr/src/window/cume_dist.rs b/datafusion/physical-expr/src/window/cume_dist.rs index 35f7311c441d..9720187ea83d 100644 --- a/datafusion/physical-expr/src/window/cume_dist.rs +++ b/datafusion/physical-expr/src/window/cume_dist.rs @@ -34,6 +34,7 @@ use std::sync::Arc; #[derive(Debug)] pub struct CumeDist { name: String, + /// Output data type data_type: DataType, } diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs index 7ee736ce9caa..054a4c13e6b6 100644 --- a/datafusion/physical-expr/src/window/lead_lag.rs +++ b/datafusion/physical-expr/src/window/lead_lag.rs @@ -35,6 +35,7 @@ use std::sync::Arc; #[derive(Debug)] pub struct WindowShift { name: String, + /// Output data type data_type: DataType, shift_offset: i64, expr: Arc, diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index b3c89122ebad..05909ab25a07 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -39,6 +39,7 @@ use datafusion_expr::PartitionEvaluator; pub struct NthValue { name: String, expr: Arc, + /// Output data type data_type: DataType, kind: NthValueKind, } diff --git a/datafusion/physical-expr/src/window/ntile.rs b/datafusion/physical-expr/src/window/ntile.rs index f6ab3a27201f..fb7a7ad84fb7 100644 --- a/datafusion/physical-expr/src/window/ntile.rs +++ b/datafusion/physical-expr/src/window/ntile.rs @@ -35,6 +35,7 @@ use std::sync::Arc; pub struct Ntile { name: String, n: u64, + /// Output data type data_type: DataType, } diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs index 351c5811c15f..1f643f0280dc 100644 --- a/datafusion/physical-expr/src/window/rank.rs +++ b/datafusion/physical-expr/src/window/rank.rs @@ -41,6 +41,7 @@ use std::sync::Arc; pub struct Rank { name: String, rank_type: RankType, + /// Output data type data_type: DataType, } diff --git a/datafusion/physical-expr/src/window/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs index 1826707b31c6..759f447ab0f8 100644 --- a/datafusion/physical-expr/src/window/row_number.rs +++ b/datafusion/physical-expr/src/window/row_number.rs @@ -36,6 +36,7 @@ use std::sync::Arc; #[derive(Debug)] pub struct RowNumber { name: String, + /// Output data type data_type: DataType, }