Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache output equivalence_properties in ProjectionExec #9097

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,14 +603,12 @@ impl ExecutionPlan for AggregateExec {
// First stage aggregation will not change the output partitioning,
// but needs to respect aliases (e.g. mapping in the GROUP BY
// expression).
let input_eq_properties = self.input.equivalence_properties();
// First stage Aggregation will not change the output partitioning but need to respect the Alias
let input_partition = self.input.output_partitioning();
if let Partitioning::Hash(exprs, part) = input_partition {
let normalized_exprs = exprs
.into_iter()
.map(|expr| {
input_eq_properties
self.input
.equivalence_properties()
.project_expr(&expr, &self.projection_mapping)
.unwrap_or_else(|| {
Arc::new(UnKnownColumn::new(&expr.to_string()))
Expand Down
15 changes: 8 additions & 7 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct ProjectionExec {
input: Arc<dyn ExecutionPlan>,
/// The output ordering
output_ordering: Option<Vec<PhysicalSortExpr>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory this doesn't need to be stored now, since it can be obtained via self.equivalence_properties.oeq_class().output_ordering(), however the last method has some non-trivial computation which are probably best avoided anytime there's a call to ProjectionExec::output_ordering()

/// The output equivalence properties
gruuya marked this conversation as resolved.
Show resolved Hide resolved
equivalence_properties: EquivalenceProperties,
/// The mapping used to normalize expressions like Partitioning and
/// PhysicalSortExpr that maps input to output
projection_mapping: ProjectionMapping,
Expand Down Expand Up @@ -96,14 +98,16 @@ impl ProjectionExec {
let projection_mapping = ProjectionMapping::try_new(&expr, &input_schema)?;

let input_eqs = input.equivalence_properties();
let project_eqs = input_eqs.project(&projection_mapping, schema.clone());
let output_ordering = project_eqs.oeq_class().output_ordering();
let equivalence_properties =
input_eqs.project(&projection_mapping, schema.clone());
let output_ordering = equivalence_properties.oeq_class().output_ordering();

Ok(Self {
expr,
schema,
input,
output_ordering,
equivalence_properties,
projection_mapping,
metrics: ExecutionPlanMetricsSet::new(),
})
Expand Down Expand Up @@ -173,12 +177,11 @@ impl ExecutionPlan for ProjectionExec {
fn output_partitioning(&self) -> Partitioning {
// Output partition need to respect the alias
let input_partition = self.input.output_partitioning();
let input_eq_properties = self.input.equivalence_properties();
if let Partitioning::Hash(exprs, part) = input_partition {
let normalized_exprs = exprs
.into_iter()
.map(|expr| {
input_eq_properties
self.equivalence_properties
.project_expr(&expr, &self.projection_mapping)
.unwrap_or_else(|| {
Arc::new(UnKnownColumn::new(&expr.to_string()))
Expand All @@ -201,9 +204,7 @@ impl ExecutionPlan for ProjectionExec {
}

fn equivalence_properties(&self) -> EquivalenceProperties {
self.input
.equivalence_properties()
.project(&self.projection_mapping, self.schema())
self.equivalence_properties.clone()
}

fn with_new_children(
Expand Down
28 changes: 15 additions & 13 deletions datafusion/sqllogictest/test_files/predicates.slt
Original file line number Diff line number Diff line change
Expand Up @@ -732,20 +732,22 @@ AggregateExec: mode=SinglePartitioned, gby=[p_partkey@2 as p_partkey], aggr=[SUM
--ProjectionExec: expr=[l_extendedprice@0 as l_extendedprice, l_discount@1 as l_discount, p_partkey@2 as p_partkey, ps_suppkey@4 as ps_suppkey]
----CoalesceBatchesExec: target_batch_size=8192
------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@2, ps_partkey@0)]
--------ProjectionExec: expr=[l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, p_partkey@3 as p_partkey]
----------CoalesceBatchesExec: target_batch_size=8192
------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_partkey@0, p_partkey@0)]
--------------CoalesceBatchesExec: target_batch_size=8192
----------------RepartitionExec: partitioning=Hash([l_partkey@0], 4), input_partitions=4
------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_partkey, l_extendedprice, l_discount], has_header=true
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([p_partkey@2], 4), input_partitions=4
gruuya marked this conversation as resolved.
Show resolved Hide resolved
------------ProjectionExec: expr=[l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, p_partkey@3 as p_partkey]
--------------CoalesceBatchesExec: target_batch_size=8192
----------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), input_partitions=4
------------------ProjectionExec: expr=[p_partkey@0 as p_partkey]
--------------------CoalesceBatchesExec: target_batch_size=8192
----------------------FilterExec: p_brand@1 = Brand#12 OR p_brand@1 = Brand#23
------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/part.csv]]}, projection=[p_partkey, p_brand], has_header=true
----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_partkey@0, p_partkey@0)]
------------------CoalesceBatchesExec: target_batch_size=8192
--------------------RepartitionExec: partitioning=Hash([l_partkey@0], 4), input_partitions=4
----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, projection=[l_partkey, l_extendedprice, l_discount], has_header=true
------------------CoalesceBatchesExec: target_batch_size=8192
--------------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), input_partitions=4
----------------------ProjectionExec: expr=[p_partkey@0 as p_partkey]
------------------------CoalesceBatchesExec: target_batch_size=8192
--------------------------FilterExec: p_brand@1 = Brand#12 OR p_brand@1 = Brand#23
----------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/part.csv]]}, projection=[p_partkey, p_brand], has_header=true
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([ps_partkey@0], 4), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]
Expand Down
Loading