Skip to content

Commit

Permalink
fix: issue apache#9130 substitute redundant columns when doing cross …
Browse files Browse the repository at this point in the history
…join
  • Loading branch information
Lordworms committed Feb 7, 2024
1 parent d1aca48 commit b6f0f21
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 7 deletions.
3 changes: 2 additions & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ async fn exec_and_print(

let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
// println!("cur statement is {:?}", statement);
let plan = create_plan(ctx, statement).await?;

// For plans like `Explain` ignore `MaxRows` option and always display all rows
Expand All @@ -233,7 +234,7 @@ async fn exec_and_print(
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Analyze(_)
);

// println!("the final logical plan is {:?}", plan);
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ impl MemTable {
pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> Result<Self> {
for batches in partitions.iter().flatten() {
let batches_schema = batches.schema();
println!(
"the new schema is {:?}, schema set is {:?}",
batches_schema, schema
);
if !schema.contains(&batches_schema) {
debug!(
"mem table schema does not contain batches schema. \
Expand Down
39 changes: 37 additions & 2 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::{
TableProviderFilterPushDown, TableSource, WriteOp,
};

use arrow::datatypes::{DataType, Schema, SchemaRef};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::{
get_target_functional_dependencies, plan_datafusion_err, plan_err, Column, DFField,
Expand Down Expand Up @@ -890,6 +890,7 @@ impl LogicalPlanBuilder {
pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
let join_schema =
build_join_schema(self.plan.schema(), right.schema(), &JoinType::Inner)?;
println!("left is {:?} \n right is {:?}", self.plan, right);
Ok(Self::from(LogicalPlan::CrossJoin(CrossJoin {
left: Arc::new(self.plan),
right: Arc::new(right),
Expand Down Expand Up @@ -1124,7 +1125,28 @@ impl LogicalPlanBuilder {
)?))
}
}

pub fn change_redundant_column(fields: Vec<DFField>) -> Vec<DFField> {
let mut name_map = HashMap::new();
fields
.into_iter()
.map(|field| {
if !name_map.contains_key(field.name()) {
name_map.insert(field.name().to_string(), 0);
field
} else {
let cur_cnt = &name_map.get(field.name());
let name = field.name().to_string() + ":" + &cur_cnt.unwrap().to_string();
name_map.insert(field.name().to_string(), cur_cnt.unwrap() + 1);
DFField::new(
field.qualifier().cloned(),
&name,
field.data_type().clone(),
field.is_nullable(),
)
}
})
.collect()
}
/// Creates a schema for a join operation.
/// The fields from the left side are first
pub fn build_join_schema(
Expand Down Expand Up @@ -1184,13 +1206,16 @@ pub fn build_join_schema(
right_fields.clone()
}
};
//println!("total fields is {:?}", fields);
let func_dependencies = left.functional_dependencies().join(
right.functional_dependencies(),
join_type,
left_fields.len(),
);
// println!("func_dependencies is {:?}", func_dependencies);
let mut metadata = left.metadata().clone();
metadata.extend(right.metadata().clone());
// let schema = DFSchema::new_with_metadata(change_redundant_column(fields), metadata)?;
let schema = DFSchema::new_with_metadata(fields, metadata)?;
schema.with_functional_dependencies(func_dependencies)
}
Expand Down Expand Up @@ -1231,12 +1256,16 @@ fn add_group_by_exprs_from_dependencies(
}
Ok(group_expr)
}
pub(crate) fn validate_unique_names_with_table<'a>() {
unimplemented!()
}
/// Errors if one or more expressions have equal names.
pub(crate) fn validate_unique_names<'a>(
node_name: &str,
expressions: impl IntoIterator<Item = &'a Expr>,
) -> Result<()> {
let mut unique_names = HashMap::new();

expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
let name = expr.display_name()?;
match unique_names.get(&name) {
Expand All @@ -1245,6 +1274,7 @@ pub(crate) fn validate_unique_names<'a>(
Ok(())
},
Some((existing_position, existing_expr)) => {
//println!("node_name is {}, existing expr is {:?}", node_name, existing_expr);
plan_err!("{node_name} require unique expression names \
but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
at position {position} have the same name. Consider aliasing (\"AS\") one of them."
Expand Down Expand Up @@ -1360,6 +1390,7 @@ pub fn project(
let mut projected_expr = vec![];
for e in expr {
let e = e.into();
//println!("cur_expression is {:?}", e);
match e {
Expr::Wildcard { qualifier: None } => {
projected_expr.extend(expand_wildcard(input_schema, &plan, None)?)
Expand All @@ -1375,6 +1406,10 @@ pub fn project(
.push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
}
}
// println!(
// "before validation the projection name is {:?} \n and the expression is {:?}",
// plan, projected_expr
// );
validate_unique_names("Projections", projected_expr.iter())?;

Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
Expand Down
11 changes: 10 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::Arc;

use super::dml::CopyTo;
use super::DdlStatement;
use crate::builder::change_redundant_column;
use crate::dml::CopyOptions;
use crate::expr::{
Alias, Exists, InSubquery, Placeholder, Sort as SortExpr, WindowFunction,
Expand Down Expand Up @@ -1891,7 +1892,11 @@ impl SubqueryAlias {
alias: impl Into<OwnedTableReference>,
) -> Result<Self> {
let alias = alias.into();
let schema: Schema = plan.schema().as_ref().clone().into();
let fields = change_redundant_column(plan.schema().fields().clone());
let meta_data = plan.schema().as_ref().metadata().clone();
let schema: Schema = DFSchema::new_with_metadata(fields, meta_data)
.unwrap()
.into();
// Since schema is the same, other than qualifier, we can use existing
// functional dependencies:
let func_dependencies = plan.schema().functional_dependencies().clone();
Expand Down Expand Up @@ -2181,6 +2186,10 @@ impl TableScan {
df_schema.with_functional_dependencies(func_dependencies)
})?;
let projected_schema = Arc::new(projected_schema);
// println!(
// "projected_schema is {:?} \n and projection is {:?}",
// projected_schema, projection,
// );
Ok(Self {
table_name,
source: table_source,
Expand Down
9 changes: 6 additions & 3 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if !select.sort_by.is_empty() {
return not_impl_err!("SORT BY");
}

// println!("select from is {:?}", select.from);
// println!("current planner_context is {:?}", planner_context);
// process `from` clause
let plan = self.plan_from_tables(select.from, planner_context)?;
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
Expand All @@ -77,15 +78,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// handle named windows before processing the projection expression
check_conflicting_windows(&select.named_window)?;
match_window_definitions(&mut select.projection, &select.named_window)?;

// process the SELECT expressions, with wildcards expanded.
let select_exprs = self.prepare_select_exprs(
&base_plan,
select.projection,
empty_from,
planner_context,
)?;

// println!(
// "base plan is {:?} \n select expression is {:?} \n planner_context is {:?}",
// base_plan, select_exprs, planner_context
// );
// having and group by clause may reference aliases defined in select projection
let projected_plan = self.project(base_plan.clone(), select_exprs.clone())?;
let mut combined_schema = (**projected_plan.schema()).clone();
Expand Down

0 comments on commit b6f0f21

Please sign in to comment.