Skip to content

Commit

Permalink
Union schema can't be a subset of the child schema (apache#8408)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
2 people authored and appletreeisyellow committed Dec 15, 2023
1 parent 5c0c619 commit 584bc3c
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 42 deletions.
11 changes: 2 additions & 9 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -919,17 +919,10 @@ impl DefaultPhysicalPlanner {
)?;
Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?))
}
LogicalPlan::Union(Union { inputs, schema }) => {
LogicalPlan::Union(Union { inputs, .. }) => {
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;

if schema.fields().len() < physical_plans[0].schema().fields().len() {
// `schema` could be a subset of the child schema. For example
// for query "select count(*) from (select a from t union all select a from t)"
// `schema` is empty but child schema contains one field `a`.
Ok(Arc::new(UnionExec::try_new_with_schema(physical_plans, schema.clone())?))
} else {
Ok(Arc::new(UnionExec::new(physical_plans)))
}
Ok(Arc::new(UnionExec::new(physical_plans)))
}
LogicalPlan::Repartition(Repartition {
input,
Expand Down
34 changes: 1 addition & 33 deletions datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::stream::ObservedStream;
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::stats::Precision;
use datafusion_common::{exec_err, internal_err, DFSchemaRef, DataFusionError, Result};
use datafusion_common::{exec_err, internal_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;

Expand Down Expand Up @@ -95,38 +95,6 @@ pub struct UnionExec {
}

impl UnionExec {
/// Create a new UnionExec with specified schema.
/// The `schema` should always be a subset of the schema of `inputs`,
/// otherwise, an error will be returned.
pub fn try_new_with_schema(
inputs: Vec<Arc<dyn ExecutionPlan>>,
schema: DFSchemaRef,
) -> Result<Self> {
let mut exec = Self::new(inputs);
let exec_schema = exec.schema();
let fields = schema
.fields()
.iter()
.map(|dff| {
exec_schema
.field_with_name(dff.name())
.cloned()
.map_err(|_| {
DataFusionError::Internal(format!(
"Cannot find the field {:?} in child schema",
dff.name()
))
})
})
.collect::<Result<Vec<Field>>>()?;
let schema = Arc::new(Schema::new_with_metadata(
fields,
exec.schema().metadata().clone(),
));
exec.schema = schema;
Ok(exec)
}

/// Create a new UnionExec
pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self {
let schema = union_schema(&inputs);
Expand Down
5 changes: 5 additions & 0 deletions datafusion/sqllogictest/test_files/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ SELECT 2 as x
1
2

query I
select count(*) from (select id from t1 union all select id from t2)
----
6

# csv_union_all
statement ok
CREATE EXTERNAL TABLE aggregate_test_100 (
Expand Down

0 comments on commit 584bc3c

Please sign in to comment.