From f1d8f82440b3977e4172b04d75c7e05e00dadb20 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 12 Oct 2023 14:18:57 +0300 Subject: [PATCH 1/2] Initial commit --- .../core/src/physical_optimizer/enforce_sorting.rs | 1 - datafusion/core/src/physical_planner.rs | 1 - datafusion/physical-plan/src/windows/mod.rs | 1 - .../physical-plan/src/windows/window_agg_exec.rs | 14 ++------------ 4 files changed, 2 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 95ec1973d017..53422894f974 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -621,7 +621,6 @@ fn analyze_window_sort_removal( Arc::new(WindowAggExec::try_new( window_expr.to_vec(), window_child, - input_schema, partitionby_exprs.to_vec(), )?) as _ }; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 35119f374fa3..7850c12c7752 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -759,7 +759,6 @@ impl DefaultPhysicalPlanner { Arc::new(WindowAggExec::try_new( window_expr, input_exec, - physical_input_schema, physical_partition_keys, )?) }) diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 0f165f79354e..e4a5afac7864 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -435,7 +435,6 @@ pub fn get_best_fitting_window( Ok(Some(Arc::new(WindowAggExec::try_new( window_expr, input.clone(), - input.schema(), physical_partition_keys.to_vec(), )?) as _)) } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index b56a9c194c8f..b4dc8ec88c68 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -59,8 +59,6 @@ pub struct WindowAggExec { window_expr: Vec>, /// Schema after the window is run schema: SchemaRef, - /// Schema before the window - input_schema: SchemaRef, /// Partition Keys pub partition_keys: Vec>, /// Execution metrics @@ -75,10 +73,9 @@ impl WindowAggExec { pub fn try_new( window_expr: Vec>, input: Arc, - input_schema: SchemaRef, partition_keys: Vec>, ) -> Result { - let schema = create_schema(&input_schema, &window_expr)?; + let schema = create_schema(&input.schema(), &window_expr)?; let schema = Arc::new(schema); let ordered_partition_by_indices = @@ -87,7 +84,6 @@ impl WindowAggExec { input, window_expr, schema, - input_schema, partition_keys, metrics: ExecutionPlanMetricsSet::new(), ordered_partition_by_indices, @@ -104,11 +100,6 @@ impl WindowAggExec { &self.input } - /// Get the input schema before any window functions are applied - pub fn input_schema(&self) -> SchemaRef { - self.input_schema.clone() - } - /// Return the output sort order of partition keys: For example /// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a // We are sure that partition by columns are always at the beginning of sort_keys @@ -230,7 +221,6 @@ impl ExecutionPlan for WindowAggExec { Ok(Arc::new(WindowAggExec::try_new( self.window_expr.clone(), children[0].clone(), - self.input_schema.clone(), self.partition_keys.clone(), )?)) } @@ -259,7 +249,7 @@ impl ExecutionPlan for WindowAggExec { fn statistics(&self) -> Statistics { let input_stat = self.input.statistics(); let win_cols = self.window_expr.len(); - let input_cols = self.input_schema.fields().len(); + let input_cols = self.input.schema().fields().len(); // TODO stats: some windowing function will maintain invariants such as min, max... let mut column_statistics = Vec::with_capacity(win_cols + input_cols); if let Some(input_col_stats) = input_stat.column_statistics { From 4890cb805b58fb9b488016c87ff0fa60e523a91b Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Thu, 12 Oct 2023 14:32:05 +0300 Subject: [PATCH 2/2] Remove input schema from proto --- .../src/physical_optimizer/enforce_sorting.rs | 2 -- .../core/src/physical_optimizer/test_utils.rs | 1 - datafusion/core/src/physical_planner.rs | 1 - .../core/tests/fuzz_cases/window_fuzz.rs | 2 -- .../src/windows/bounded_window_agg_exec.rs | 14 ++------------ datafusion/physical-plan/src/windows/mod.rs | 2 -- datafusion/proto/proto/datafusion.proto | 1 - datafusion/proto/src/generated/pbjson.rs | 18 ------------------ datafusion/proto/src/generated/prost.rs | 2 -- datafusion/proto/src/physical_plan/mod.rs | 19 +------------------ .../tests/cases/roundtrip_physical_plan.rs | 1 - 11 files changed, 3 insertions(+), 60 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 53422894f974..28224680f7b1 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -608,12 +608,10 @@ fn analyze_window_sort_removal( add_sort_above(&mut window_child, sort_expr, None)?; let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory()); - let input_schema = window_child.schema(); let new_window = if uses_bounded_memory { Arc::new(BoundedWindowAggExec::try_new( window_expr.to_vec(), window_child, - input_schema, partitionby_exprs.to_vec(), PartitionSearchMode::Sorted, )?) as _ diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 0915fdbf1cd7..4671bff04832 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -238,7 +238,6 @@ pub fn bounded_window_exec( ) .unwrap()], input.clone(), - input.schema(), vec![], crate::physical_plan::windows::PartitionSearchMode::Sorted, ) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 7850c12c7752..325927bb7381 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -751,7 +751,6 @@ impl DefaultPhysicalPlanner { Arc::new(BoundedWindowAggExec::try_new( window_expr, input_exec, - physical_input_schema, physical_partition_keys, PartitionSearchMode::Sorted, )?) diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index 1f0a4b09b15f..83c8e1f57896 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -461,7 +461,6 @@ async fn run_window_test( ) .unwrap()], exec1, - schema.clone(), vec![], ) .unwrap(), @@ -484,7 +483,6 @@ async fn run_window_test( ) .unwrap()], exec2, - schema.clone(), vec![], search_mode, ) diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index dfef0ddefa03..800ea42b3562 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -88,8 +88,6 @@ pub struct BoundedWindowAggExec { window_expr: Vec>, /// Schema after the window is run schema: SchemaRef, - /// Schema before the window - input_schema: SchemaRef, /// Partition Keys pub partition_keys: Vec>, /// Execution metrics @@ -110,11 +108,10 @@ impl BoundedWindowAggExec { pub fn try_new( window_expr: Vec>, input: Arc, - input_schema: SchemaRef, partition_keys: Vec>, partition_search_mode: PartitionSearchMode, ) -> Result { - let schema = create_schema(&input_schema, &window_expr)?; + let schema = create_schema(&input.schema(), &window_expr)?; let schema = Arc::new(schema); let partition_by_exprs = window_expr[0].partition_by(); let ordered_partition_by_indices = match &partition_search_mode { @@ -140,7 +137,6 @@ impl BoundedWindowAggExec { input, window_expr, schema, - input_schema, partition_keys, metrics: ExecutionPlanMetricsSet::new(), partition_search_mode, @@ -158,11 +154,6 @@ impl BoundedWindowAggExec { &self.input } - /// Get the input schema before any window functions are applied - pub fn input_schema(&self) -> SchemaRef { - self.input_schema.clone() - } - /// Return the output sort order of partition keys: For example /// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a // We are sure that partition by columns are always at the beginning of sort_keys @@ -303,7 +294,6 @@ impl ExecutionPlan for BoundedWindowAggExec { Ok(Arc::new(BoundedWindowAggExec::try_new( self.window_expr.clone(), children[0].clone(), - self.input_schema.clone(), self.partition_keys.clone(), self.partition_search_mode.clone(), )?)) @@ -333,7 +323,7 @@ impl ExecutionPlan for BoundedWindowAggExec { fn statistics(&self) -> Statistics { let input_stat = self.input.statistics(); let win_cols = self.window_expr.len(); - let input_cols = self.input_schema.fields().len(); + let input_cols = self.input.schema().fields().len(); // TODO stats: some windowing function will maintain invariants such as min, max... let mut column_statistics = Vec::with_capacity(win_cols + input_cols); if let Some(input_col_stats) = input_stat.column_statistics { diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index e4a5afac7864..cc915e54af60 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -421,7 +421,6 @@ pub fn get_best_fitting_window( Ok(Some(Arc::new(BoundedWindowAggExec::try_new( window_expr, input.clone(), - input.schema(), physical_partition_keys.to_vec(), partition_search_mode, )?) as _)) @@ -758,7 +757,6 @@ mod tests { schema.as_ref(), )?], blocking_exec, - schema, vec![], )?); diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index bda0f7828726..c60dae71ef86 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1419,7 +1419,6 @@ message PartiallySortedPartitionSearchMode { message WindowAggExecNode { PhysicalPlanNode input = 1; repeated PhysicalWindowExprNode window_expr = 2; - Schema input_schema = 4; repeated PhysicalExprNode partition_keys = 5; // Set optional to `None` for `BoundedWindowAggExec`. oneof partition_search_mode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index ced0c8bd7c7a..266075e68922 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -23969,9 +23969,6 @@ impl serde::Serialize for WindowAggExecNode { if !self.window_expr.is_empty() { len += 1; } - if self.input_schema.is_some() { - len += 1; - } if !self.partition_keys.is_empty() { len += 1; } @@ -23985,9 +23982,6 @@ impl serde::Serialize for WindowAggExecNode { if !self.window_expr.is_empty() { struct_ser.serialize_field("windowExpr", &self.window_expr)?; } - if let Some(v) = self.input_schema.as_ref() { - struct_ser.serialize_field("inputSchema", v)?; - } if !self.partition_keys.is_empty() { struct_ser.serialize_field("partitionKeys", &self.partition_keys)?; } @@ -24017,8 +24011,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode { "input", "window_expr", "windowExpr", - "input_schema", - "inputSchema", "partition_keys", "partitionKeys", "linear", @@ -24031,7 +24023,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode { enum GeneratedField { Input, WindowExpr, - InputSchema, PartitionKeys, Linear, PartiallySorted, @@ -24059,7 +24050,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode { match value { "input" => Ok(GeneratedField::Input), "windowExpr" | "window_expr" => Ok(GeneratedField::WindowExpr), - "inputSchema" | "input_schema" => Ok(GeneratedField::InputSchema), "partitionKeys" | "partition_keys" => Ok(GeneratedField::PartitionKeys), "linear" => Ok(GeneratedField::Linear), "partiallySorted" | "partially_sorted" => Ok(GeneratedField::PartiallySorted), @@ -24085,7 +24075,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode { { let mut input__ = None; let mut window_expr__ = None; - let mut input_schema__ = None; let mut partition_keys__ = None; let mut partition_search_mode__ = None; while let Some(k) = map_.next_key()? { @@ -24102,12 +24091,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode { } window_expr__ = Some(map_.next_value()?); } - GeneratedField::InputSchema => { - if input_schema__.is_some() { - return Err(serde::de::Error::duplicate_field("inputSchema")); - } - input_schema__ = map_.next_value()?; - } GeneratedField::PartitionKeys => { if partition_keys__.is_some() { return Err(serde::de::Error::duplicate_field("partitionKeys")); @@ -24140,7 +24123,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode { Ok(WindowAggExecNode { input: input__, window_expr: window_expr__.unwrap_or_default(), - input_schema: input_schema__, partition_keys: partition_keys__.unwrap_or_default(), partition_search_mode: partition_search_mode__, }) diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index ca20cd35cb55..894afa570fb0 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1994,8 +1994,6 @@ pub struct WindowAggExecNode { pub input: ::core::option::Option<::prost::alloc::boxed::Box>, #[prost(message, repeated, tag = "2")] pub window_expr: ::prost::alloc::vec::Vec, - #[prost(message, optional, tag = "4")] - pub input_schema: ::core::option::Option, #[prost(message, repeated, tag = "5")] pub partition_keys: ::prost::alloc::vec::Vec, /// Set optional to `None` for `BoundedWindowAggExec`. diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 8257f9aa3458..08010a3151ee 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -282,16 +282,7 @@ impl AsExecutionPlan for PhysicalPlanNode { runtime, extension_codec, )?; - let input_schema = window_agg - .input_schema - .as_ref() - .ok_or_else(|| { - DataFusionError::Internal( - "input_schema in WindowAggrNode is missing.".to_owned(), - ) - })? - .clone(); - let input_schema: SchemaRef = SchemaRef::new((&input_schema).try_into()?); + let input_schema = input.schema(); let physical_window_expr: Vec> = window_agg .window_expr @@ -333,7 +324,6 @@ impl AsExecutionPlan for PhysicalPlanNode { Ok(Arc::new(BoundedWindowAggExec::try_new( physical_window_expr, input, - input_schema, partition_keys, partition_search_mode, )?)) @@ -341,7 +331,6 @@ impl AsExecutionPlan for PhysicalPlanNode { Ok(Arc::new(WindowAggExec::try_new( physical_window_expr, input, - input_schema, partition_keys, )?)) } @@ -1315,8 +1304,6 @@ impl AsExecutionPlan for PhysicalPlanNode { extension_codec, )?; - let input_schema = protobuf::Schema::try_from(exec.input_schema().as_ref())?; - let window_expr = exec.window_expr() .iter() @@ -1334,7 +1321,6 @@ impl AsExecutionPlan for PhysicalPlanNode { protobuf::WindowAggExecNode { input: Some(Box::new(input)), window_expr, - input_schema: Some(input_schema), partition_keys, partition_search_mode: None, }, @@ -1346,8 +1332,6 @@ impl AsExecutionPlan for PhysicalPlanNode { extension_codec, )?; - let input_schema = protobuf::Schema::try_from(exec.input_schema().as_ref())?; - let window_expr = exec.window_expr() .iter() @@ -1385,7 +1369,6 @@ impl AsExecutionPlan for PhysicalPlanNode { protobuf::WindowAggExecNode { input: Some(Box::new(input)), window_expr, - input_schema: Some(input_schema), partition_keys, partition_search_mode: Some(partition_search_mode), }, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 77e77630bcb2..e30d416bdc95 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -275,7 +275,6 @@ fn roundtrip_window() -> Result<()> { sliding_aggr_window_expr, ], input, - schema.clone(), vec![col("b", &schema)?], )?)) }