Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Minor]: Remove input_schema field from window executor #7810

Merged
merged 2 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,20 +608,17 @@ fn analyze_window_sort_removal(
add_sort_above(&mut window_child, sort_expr, None)?;

let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
let input_schema = window_child.schema();
let new_window = if uses_bounded_memory {
Arc::new(BoundedWindowAggExec::try_new(
window_expr.to_vec(),
window_child,
input_schema,
partitionby_exprs.to_vec(),
PartitionSearchMode::Sorted,
)?) as _
} else {
Arc::new(WindowAggExec::try_new(
window_expr.to_vec(),
window_child,
input_schema,
partitionby_exprs.to_vec(),
)?) as _
};
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ pub fn bounded_window_exec(
)
.unwrap()],
input.clone(),
input.schema(),
vec![],
crate::physical_plan::windows::PartitionSearchMode::Sorted,
)
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,15 +751,13 @@ impl DefaultPhysicalPlanner {
Arc::new(BoundedWindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
PartitionSearchMode::Sorted,
)?)
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
)?)
})
Expand Down
2 changes: 0 additions & 2 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,6 @@ async fn run_window_test(
)
.unwrap()],
exec1,
schema.clone(),
vec![],
)
.unwrap(),
Expand All @@ -484,7 +483,6 @@ async fn run_window_test(
)
.unwrap()],
exec2,
schema.clone(),
vec![],
search_mode,
)
Expand Down
14 changes: 2 additions & 12 deletions datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ pub struct BoundedWindowAggExec {
window_expr: Vec<Arc<dyn WindowExpr>>,
/// Schema after the window is run
schema: SchemaRef,
/// Schema before the window
input_schema: SchemaRef,
/// Partition Keys
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Execution metrics
Expand All @@ -110,11 +108,10 @@ impl BoundedWindowAggExec {
pub fn try_new(
window_expr: Vec<Arc<dyn WindowExpr>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
partition_search_mode: PartitionSearchMode,
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = create_schema(&input.schema(), &window_expr)?;
let schema = Arc::new(schema);
let partition_by_exprs = window_expr[0].partition_by();
let ordered_partition_by_indices = match &partition_search_mode {
Expand All @@ -140,7 +137,6 @@ impl BoundedWindowAggExec {
input,
window_expr,
schema,
input_schema,
partition_keys,
metrics: ExecutionPlanMetricsSet::new(),
partition_search_mode,
Expand All @@ -158,11 +154,6 @@ impl BoundedWindowAggExec {
&self.input
}

/// Get the input schema before any window functions are applied
pub fn input_schema(&self) -> SchemaRef {
self.input_schema.clone()
}

/// Return the output sort order of partition keys: For example
/// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a
// We are sure that partition by columns are always at the beginning of sort_keys
Expand Down Expand Up @@ -303,7 +294,6 @@ impl ExecutionPlan for BoundedWindowAggExec {
Ok(Arc::new(BoundedWindowAggExec::try_new(
self.window_expr.clone(),
children[0].clone(),
self.input_schema.clone(),
self.partition_keys.clone(),
self.partition_search_mode.clone(),
)?))
Expand Down Expand Up @@ -333,7 +323,7 @@ impl ExecutionPlan for BoundedWindowAggExec {
fn statistics(&self) -> Statistics {
let input_stat = self.input.statistics();
let win_cols = self.window_expr.len();
let input_cols = self.input_schema.fields().len();
let input_cols = self.input.schema().fields().len();
// TODO stats: some windowing function will maintain invariants such as min, max...
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
if let Some(input_col_stats) = input_stat.column_statistics {
Expand Down
3 changes: 0 additions & 3 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,6 @@ pub fn get_best_fitting_window(
Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
window_expr,
input.clone(),
input.schema(),
physical_partition_keys.to_vec(),
partition_search_mode,
)?) as _))
Expand All @@ -435,7 +434,6 @@ pub fn get_best_fitting_window(
Ok(Some(Arc::new(WindowAggExec::try_new(
window_expr,
input.clone(),
input.schema(),
physical_partition_keys.to_vec(),
)?) as _))
}
Expand Down Expand Up @@ -759,7 +757,6 @@ mod tests {
schema.as_ref(),
)?],
blocking_exec,
schema,
vec![],
)?);

Expand Down
14 changes: 2 additions & 12 deletions datafusion/physical-plan/src/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ pub struct WindowAggExec {
window_expr: Vec<Arc<dyn WindowExpr>>,
/// Schema after the window is run
schema: SchemaRef,
/// Schema before the window
input_schema: SchemaRef,
/// Partition Keys
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Execution metrics
Expand All @@ -75,10 +73,9 @@ impl WindowAggExec {
pub fn try_new(
window_expr: Vec<Arc<dyn WindowExpr>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = create_schema(&input.schema(), &window_expr)?;
let schema = Arc::new(schema);

let ordered_partition_by_indices =
Expand All @@ -87,7 +84,6 @@ impl WindowAggExec {
input,
window_expr,
schema,
input_schema,
partition_keys,
metrics: ExecutionPlanMetricsSet::new(),
ordered_partition_by_indices,
Expand All @@ -104,11 +100,6 @@ impl WindowAggExec {
&self.input
}

/// Get the input schema before any window functions are applied
pub fn input_schema(&self) -> SchemaRef {
self.input_schema.clone()
}

/// Return the output sort order of partition keys: For example
/// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a
// We are sure that partition by columns are always at the beginning of sort_keys
Expand Down Expand Up @@ -230,7 +221,6 @@ impl ExecutionPlan for WindowAggExec {
Ok(Arc::new(WindowAggExec::try_new(
self.window_expr.clone(),
children[0].clone(),
self.input_schema.clone(),
self.partition_keys.clone(),
)?))
}
Expand Down Expand Up @@ -259,7 +249,7 @@ impl ExecutionPlan for WindowAggExec {
fn statistics(&self) -> Statistics {
let input_stat = self.input.statistics();
let win_cols = self.window_expr.len();
let input_cols = self.input_schema.fields().len();
let input_cols = self.input.schema().fields().len();
// TODO stats: some windowing function will maintain invariants such as min, max...
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
if let Some(input_col_stats) = input_stat.column_statistics {
Expand Down
1 change: 0 additions & 1 deletion datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1419,7 +1419,6 @@ message PartiallySortedPartitionSearchMode {
message WindowAggExecNode {
PhysicalPlanNode input = 1;
repeated PhysicalWindowExprNode window_expr = 2;
Schema input_schema = 4;
repeated PhysicalExprNode partition_keys = 5;
// Set optional to `None` for `BoundedWindowAggExec`.
oneof partition_search_mode {
Expand Down
18 changes: 0 additions & 18 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 1 addition & 18 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
runtime,
extension_codec,
)?;
let input_schema = window_agg
.input_schema
.as_ref()
.ok_or_else(|| {
DataFusionError::Internal(
"input_schema in WindowAggrNode is missing.".to_owned(),
)
})?
.clone();
let input_schema: SchemaRef = SchemaRef::new((&input_schema).try_into()?);
let input_schema = input.schema();

let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
.window_expr
Expand Down Expand Up @@ -333,15 +324,13 @@ impl AsExecutionPlan for PhysicalPlanNode {
Ok(Arc::new(BoundedWindowAggExec::try_new(
physical_window_expr,
input,
input_schema,
partition_keys,
partition_search_mode,
)?))
} else {
Ok(Arc::new(WindowAggExec::try_new(
physical_window_expr,
input,
input_schema,
partition_keys,
)?))
}
Expand Down Expand Up @@ -1315,8 +1304,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
extension_codec,
)?;

let input_schema = protobuf::Schema::try_from(exec.input_schema().as_ref())?;

let window_expr =
exec.window_expr()
.iter()
Expand All @@ -1334,7 +1321,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
protobuf::WindowAggExecNode {
input: Some(Box::new(input)),
window_expr,
input_schema: Some(input_schema),
partition_keys,
partition_search_mode: None,
},
Expand All @@ -1346,8 +1332,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
extension_codec,
)?;

let input_schema = protobuf::Schema::try_from(exec.input_schema().as_ref())?;

let window_expr =
exec.window_expr()
.iter()
Expand Down Expand Up @@ -1385,7 +1369,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
protobuf::WindowAggExecNode {
input: Some(Box::new(input)),
window_expr,
input_schema: Some(input_schema),
partition_keys,
partition_search_mode: Some(partition_search_mode),
},
Expand Down
1 change: 0 additions & 1 deletion datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ fn roundtrip_window() -> Result<()> {
sliding_aggr_window_expr,
],
input,
schema.clone(),
vec![col("b", &schema)?],
)?))
}
Expand Down