Skip to content

Commit

Permalink
Keep track of output equivalence_properties in ProjectionExec
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Feb 1, 2024
1 parent 1097dc0 commit 96ba157
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
5 changes: 2 additions & 3 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,14 +603,13 @@ impl ExecutionPlan for AggregateExec {
// First stage aggregation will not change the output partitioning,
// but needs to respect aliases (e.g. mapping in the GROUP BY
// expression).
let input_eq_properties = self.input.equivalence_properties();
// First stage Aggregation will not change the output partitioning but need to respect the Alias
let input_partition = self.input.output_partitioning();
if let Partitioning::Hash(exprs, part) = input_partition {
let normalized_exprs = exprs
.into_iter()
.map(|expr| {
input_eq_properties
self.input
.equivalence_properties()
.project_expr(&expr, &self.projection_mapping)
.unwrap_or_else(|| {
Arc::new(UnKnownColumn::new(&expr.to_string()))
Expand Down
15 changes: 8 additions & 7 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct ProjectionExec {
input: Arc<dyn ExecutionPlan>,
/// The output ordering
output_ordering: Option<Vec<PhysicalSortExpr>>,
/// The output equivalence properties
equivalence_properties: EquivalenceProperties,
/// The mapping used to normalize expressions like Partitioning and
/// PhysicalSortExpr that maps input to output
projection_mapping: ProjectionMapping,
Expand Down Expand Up @@ -96,14 +98,16 @@ impl ProjectionExec {
let projection_mapping = ProjectionMapping::try_new(&expr, &input_schema)?;

let input_eqs = input.equivalence_properties();
let project_eqs = input_eqs.project(&projection_mapping, schema.clone());
let output_ordering = project_eqs.oeq_class().output_ordering();
let equivalence_properties =
input_eqs.project(&projection_mapping, schema.clone());
let output_ordering = equivalence_properties.oeq_class().output_ordering();

Ok(Self {
expr,
schema,
input,
output_ordering,
equivalence_properties,
projection_mapping,
metrics: ExecutionPlanMetricsSet::new(),
})
Expand Down Expand Up @@ -173,12 +177,11 @@ impl ExecutionPlan for ProjectionExec {
fn output_partitioning(&self) -> Partitioning {
// Output partition need to respect the alias
let input_partition = self.input.output_partitioning();
let input_eq_properties = self.input.equivalence_properties();
if let Partitioning::Hash(exprs, part) = input_partition {
let normalized_exprs = exprs
.into_iter()
.map(|expr| {
input_eq_properties
self.equivalence_properties
.project_expr(&expr, &self.projection_mapping)
.unwrap_or_else(|| {
Arc::new(UnKnownColumn::new(&expr.to_string()))
Expand All @@ -201,9 +204,7 @@ impl ExecutionPlan for ProjectionExec {
}

fn equivalence_properties(&self) -> EquivalenceProperties {
self.input
.equivalence_properties()
.project(&self.projection_mapping, self.schema())
self.equivalence_properties.clone()
}

fn with_new_children(
Expand Down

0 comments on commit 96ba157

Please sign in to comment.