From 8258fafb34d8f73d77d0ae3200ca6430a5032795 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Sun, 23 Jun 2024 17:58:23 +0900 Subject: [PATCH 1/4] Add test cases for issue --- datafusion/expr/src/logical_plan/builder.rs | 25 +++++++++++++++++++++ datafusion/sql/tests/cases/plan_to_sql.rs | 14 +++++++++++- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2f1ece32ab15..25f2d9aa619a 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -2255,4 +2255,29 @@ mod tests { Ok(()) } + + #[test] + fn test_union_strips_qualifiers() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("foo", DataType::Int32, false), + Field::new("bar", DataType::Int32, false), + ]); + let result = table_scan(Some("t1"), &schema, None)? + .union(table_scan(Some("t2"), &schema, None)?.build()?)? + .build()?; + + let LogicalPlan::Union(union) = result else { + panic!("expected union, got {result:?}") + }; + + assert!( + union + .schema + .iter() + .all(|(qualifier, _)| qualifier.is_none()), + "Expected the schema from a Union to not have any table qualifiers" + ); + + Ok(()) + } } diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 33e28e7056b9..374403d853f9 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -230,6 +230,16 @@ fn roundtrip_statement_with_dialect() -> Result<()> { parser_dialect: Box::new(GenericDialect {}), unparser_dialect: Box::new(UnparserDefaultDialect {}), }, + TestStatementWithDialect { + sql: "SELECT j1_id FROM j1 + UNION ALL + SELECT tb.j2_id as j1_id FROM j2 tb + ORDER BY j1_id + LIMIT 10;", + expected: r#"SELECT j1.j1_id FROM j1 UNION ALL SELECT tb.j2_id AS j1_id FROM j2 AS tb ORDER BY j1_id ASC NULLS LAST LIMIT 10"#, + parser_dialect: Box::new(GenericDialect {}), + unparser_dialect: Box::new(UnparserDefaultDialect {}), + }, ]; for query in tests { @@ -239,7 +249,9 @@ fn roundtrip_statement_with_dialect() -> Result<()> { let context = MockContextProvider::default(); let sql_to_rel = SqlToRel::new(&context); - let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap(); + let plan = sql_to_rel + .sql_statement_to_plan(statement) + .unwrap_or_else(|e| panic!("Failed to parse sql: {}\n{e}", query.sql)); let unparser = Unparser::new(&*query.unparser_dialect); let roundtrip_statement = unparser.plan_to_sql(&plan)?; From 103ae3d498e38f77aaf87ad0c408a3c62401c916 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Mon, 24 Jun 2024 00:17:00 +0900 Subject: [PATCH 2/4] Remove test from logical_plan/builder.rs --- datafusion/expr/src/logical_plan/builder.rs | 25 --------------------- 1 file changed, 25 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 25f2d9aa619a..2f1ece32ab15 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -2255,29 +2255,4 @@ mod tests { Ok(()) } - - #[test] - fn test_union_strips_qualifiers() -> Result<()> { - let schema = Schema::new(vec![ - Field::new("foo", DataType::Int32, false), - Field::new("bar", DataType::Int32, false), - ]); - let result = table_scan(Some("t1"), &schema, None)? - .union(table_scan(Some("t2"), &schema, None)?.build()?)? - .build()?; - - let LogicalPlan::Union(union) = result else { - panic!("expected union, got {result:?}") - }; - - assert!( - union - .schema - .iter() - .all(|(qualifier, _)| qualifier.is_none()), - "Expected the schema from a Union to not have any table qualifiers" - ); - - Ok(()) - } } From 01c431424fa55b119ca572d047a2cd478cf5732d Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Mon, 24 Jun 2024 01:21:18 +0900 Subject: [PATCH 3/4] Remove table qualifiers for sorts following a Union --- datafusion/sql/src/unparser/mod.rs | 1 + datafusion/sql/src/unparser/plan.rs | 14 +++- datafusion/sql/src/unparser/rewrite.rs | 96 ++++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 3 deletions(-) create mode 100644 datafusion/sql/src/unparser/rewrite.rs diff --git a/datafusion/sql/src/unparser/mod.rs b/datafusion/sql/src/unparser/mod.rs index fb0285901c3f..fbbed4972b17 100644 --- a/datafusion/sql/src/unparser/mod.rs +++ b/datafusion/sql/src/unparser/mod.rs @@ -18,6 +18,7 @@ mod ast; mod expr; mod plan; +mod rewrite; mod utils; pub use expr::expr_to_sql; diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index a4a457f51dc9..ef8b56c41df1 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -15,7 +15,11 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::{internal_err, not_impl_err, plan_err, DataFusionError, Result}; +use datafusion_common::{ + internal_err, not_impl_err, plan_err, + tree_node::{TransformedResult, TreeNode}, + DataFusionError, Result, +}; use datafusion_expr::{ expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan, Projection, }; @@ -28,6 +32,7 @@ use super::{ BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder, SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder, }, + rewrite::NormalizeUnionSchema, utils::{find_agg_node_within_select, unproject_window_exprs, AggVariant}, Unparser, }; @@ -63,6 +68,9 @@ pub fn plan_to_sql(plan: &LogicalPlan) -> Result { impl Unparser<'_> { pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result { + let mut union_schema_normalizer = NormalizeUnionSchema::new(); + let plan = plan.clone().rewrite(&mut union_schema_normalizer).data()?; + match plan { LogicalPlan::Projection(_) | LogicalPlan::Filter(_) @@ -80,8 +88,8 @@ impl Unparser<'_> { | LogicalPlan::Limit(_) | LogicalPlan::Statement(_) | LogicalPlan::Values(_) - | LogicalPlan::Distinct(_) => self.select_to_sql_statement(plan), - LogicalPlan::Dml(_) => self.dml_to_sql(plan), + | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan), + LogicalPlan::Dml(_) => self.dml_to_sql(&plan), LogicalPlan::Explain(_) | LogicalPlan::Analyze(_) | LogicalPlan::Extension(_) diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs new file mode 100644 index 000000000000..f43c56b29f23 --- /dev/null +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion_common::{ + tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter}, + Result, +}; +use datafusion_expr::{Expr, LogicalPlan, Sort}; + +/// Normalize the schema of a union plan to remove qualifiers from the schema fields. +pub(super) struct NormalizeUnionSchema {} + +impl NormalizeUnionSchema { + pub fn new() -> Self { + Self {} + } +} + +impl TreeNodeRewriter for NormalizeUnionSchema { + type Node = LogicalPlan; + + /// Invoked while traversing down the tree before any children are rewritten. + /// Default implementation returns the node as is and continues recursion. + fn f_down(&mut self, plan: LogicalPlan) -> Result> { + match plan { + LogicalPlan::Union(mut union) => { + let schema = match Arc::try_unwrap(union.schema) { + Ok(inner) => inner, + Err(schema) => (*schema).clone(), + }; + let schema = schema.strip_qualifiers(); + + union.schema = Arc::new(schema); + Ok(Transformed::yes(LogicalPlan::Union(union))) + } + LogicalPlan::Sort(sort) => { + if !matches!(&*sort.input, LogicalPlan::Union(_)) { + return Ok(Transformed::no(LogicalPlan::Sort(sort))); + } + + let mut sort_expr_rewriter = NormalizeSortExprForUnion::new(); + let sort_exprs: Vec = sort + .expr + .into_iter() + .map(|expr| expr.rewrite(&mut sort_expr_rewriter).data()) + .collect::>>()?; + + Ok(Transformed::yes(LogicalPlan::Sort(Sort { + expr: sort_exprs, + input: sort.input, + fetch: sort.fetch, + }))) + } + _ => Ok(Transformed::no(plan)), + } + } +} + +/// Normalize the schema of sort expressions that follow a Union to remove any column relations. +struct NormalizeSortExprForUnion {} + +impl NormalizeSortExprForUnion { + pub fn new() -> Self { + Self {} + } +} + +impl TreeNodeRewriter for NormalizeSortExprForUnion { + type Node = Expr; + + fn f_down(&mut self, expr: Expr) -> Result> { + match expr { + Expr::Column(mut col) => { + col.relation = None; + Ok(Transformed::yes(Expr::Column(col))) + } + _ => Ok(Transformed::no(expr)), + } + } +} From b38de4f340d91f1a2c7f4af868c4a87b94f38dc4 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Mon, 24 Jun 2024 12:35:21 +0900 Subject: [PATCH 4/4] Use transform_up and add more documentation on why this is needed --- datafusion/sql/src/unparser/plan.rs | 11 +-- datafusion/sql/src/unparser/rewrite.rs | 131 +++++++++++++------------ 2 files changed, 71 insertions(+), 71 deletions(-) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index ef8b56c41df1..15137403c582 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -15,11 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::{ - internal_err, not_impl_err, plan_err, - tree_node::{TransformedResult, TreeNode}, - DataFusionError, Result, -}; +use datafusion_common::{internal_err, not_impl_err, plan_err, DataFusionError, Result}; use datafusion_expr::{ expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan, Projection, }; @@ -32,7 +28,7 @@ use super::{ BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder, SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder, }, - rewrite::NormalizeUnionSchema, + rewrite::normalize_union_schema, utils::{find_agg_node_within_select, unproject_window_exprs, AggVariant}, Unparser, }; @@ -68,8 +64,7 @@ pub fn plan_to_sql(plan: &LogicalPlan) -> Result { impl Unparser<'_> { pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result { - let mut union_schema_normalizer = NormalizeUnionSchema::new(); - let plan = plan.clone().rewrite(&mut union_schema_normalizer).data()?; + let plan = normalize_union_schema(plan)?; match plan { LogicalPlan::Projection(_) diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs index f43c56b29f23..a73fce30ced3 100644 --- a/datafusion/sql/src/unparser/rewrite.rs +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -18,79 +18,84 @@ use std::sync::Arc; use datafusion_common::{ - tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter}, + tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeIterator}, Result, }; use datafusion_expr::{Expr, LogicalPlan, Sort}; -/// Normalize the schema of a union plan to remove qualifiers from the schema fields. -pub(super) struct NormalizeUnionSchema {} +/// Normalize the schema of a union plan to remove qualifiers from the schema fields and sort expressions. +/// +/// DataFusion will return an error if two columns in the schema have the same name with no table qualifiers. +/// There are certain types of UNION queries that can result in having two columns with the same name, and the +/// solution was to add table qualifiers to the schema fields. +/// See for more context on this decision. +/// +/// However, this causes a problem when unparsing these queries back to SQL - as the table qualifier has +/// logically been erased and is no longer a valid reference. +/// +/// The following input SQL: +/// ```sql +/// SELECT table1.foo FROM table1 +/// UNION ALL +/// SELECT table2.foo FROM table2 +/// ORDER BY foo +/// ``` +/// +/// Would be unparsed into the following invalid SQL without this transformation: +/// ```sql +/// SELECT table1.foo FROM table1 +/// UNION ALL +/// SELECT table2.foo FROM table2 +/// ORDER BY table1.foo +/// ``` +/// +/// Which would result in a SQL error, as `table1.foo` is not a valid reference in the context of the UNION. +pub(super) fn normalize_union_schema(plan: &LogicalPlan) -> Result { + let plan = plan.clone(); -impl NormalizeUnionSchema { - pub fn new() -> Self { - Self {} - } -} - -impl TreeNodeRewriter for NormalizeUnionSchema { - type Node = LogicalPlan; + let transformed_plan = plan.transform_up(|plan| match plan { + LogicalPlan::Union(mut union) => { + let schema = match Arc::try_unwrap(union.schema) { + Ok(inner) => inner, + Err(schema) => (*schema).clone(), + }; + let schema = schema.strip_qualifiers(); - /// Invoked while traversing down the tree before any children are rewritten. - /// Default implementation returns the node as is and continues recursion. - fn f_down(&mut self, plan: LogicalPlan) -> Result> { - match plan { - LogicalPlan::Union(mut union) => { - let schema = match Arc::try_unwrap(union.schema) { - Ok(inner) => inner, - Err(schema) => (*schema).clone(), - }; - let schema = schema.strip_qualifiers(); - - union.schema = Arc::new(schema); - Ok(Transformed::yes(LogicalPlan::Union(union))) + union.schema = Arc::new(schema); + Ok(Transformed::yes(LogicalPlan::Union(union))) + } + LogicalPlan::Sort(sort) => { + // Only rewrite Sort expressions that have a UNION as their input + if !matches!(&*sort.input, LogicalPlan::Union(_)) { + return Ok(Transformed::no(LogicalPlan::Sort(sort))); } - LogicalPlan::Sort(sort) => { - if !matches!(&*sort.input, LogicalPlan::Union(_)) { - return Ok(Transformed::no(LogicalPlan::Sort(sort))); - } - - let mut sort_expr_rewriter = NormalizeSortExprForUnion::new(); - let sort_exprs: Vec = sort - .expr - .into_iter() - .map(|expr| expr.rewrite(&mut sort_expr_rewriter).data()) - .collect::>>()?; - Ok(Transformed::yes(LogicalPlan::Sort(Sort { - expr: sort_exprs, - input: sort.input, - fetch: sort.fetch, - }))) - } - _ => Ok(Transformed::no(plan)), + Ok(Transformed::yes(LogicalPlan::Sort(Sort { + expr: rewrite_sort_expr_for_union(sort.expr)?, + input: sort.input, + fetch: sort.fetch, + }))) } - } + _ => Ok(Transformed::no(plan)), + }); + transformed_plan.data() } -/// Normalize the schema of sort expressions that follow a Union to remove any column relations. -struct NormalizeSortExprForUnion {} - -impl NormalizeSortExprForUnion { - pub fn new() -> Self { - Self {} - } -} - -impl TreeNodeRewriter for NormalizeSortExprForUnion { - type Node = Expr; +/// Rewrite sort expressions that have a UNION plan as their input to remove the table reference. +fn rewrite_sort_expr_for_union(exprs: Vec) -> Result> { + let sort_exprs: Vec = exprs + .into_iter() + .map_until_stop_and_collect(|expr| { + expr.transform_up(|expr| { + if let Expr::Column(mut col) = expr { + col.relation = None; + Ok(Transformed::yes(Expr::Column(col))) + } else { + Ok(Transformed::no(expr)) + } + }) + }) + .data()?; - fn f_down(&mut self, expr: Expr) -> Result> { - match expr { - Expr::Column(mut col) => { - col.relation = None; - Ok(Transformed::yes(Expr::Column(col))) - } - _ => Ok(Transformed::no(expr)), - } - } + Ok(sort_exprs) }