Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A More General Approach for Optimizing Projections in Physical Plans #9111

Open
berkaysynnada opened this issue Feb 2, 2024 · 9 comments
Open
Labels
enhancement New feature or request

Comments

@berkaysynnada
Copy link
Contributor

berkaysynnada commented Feb 2, 2024

Is your feature request related to a problem or challenge?

In the current version of ProjectionPushdown, there are some algorithmic limitations and it is not very friendly to be extendable. To solve this optimization in theoretical limits within the strategy of "pushing down" is not possible. We will need another approach. Also any custom plan should be easily integrable with this optimization.

Describe the solution you'd like

The new rule aims achieving the most effective use of projections in plans. It will ensures that query plans are free from unnecessary projections and that no unused columns are propagated unnecessarily between plans. The rule is designed to enhance query performance by:

  1. Preventing the transfer of unused columns from leaves to root.
  2. Ensuring projections are used only when they contribute to narrowing the schema, or when necessary for evaluation or aliasing.

The optimization is conducted in two phases:

Top-down Phase:

  • Traverses the plan from root to leaves. If the node is:
    1. Projection node, it may:
      a) Merge it with its input projection if merge is beneficial.
      b) Remove the projection if it is redundant.
      c) Narrow the Projection if possible.
      d) The projection can be nested into the source.
      e) Do nothing, otherwise.
    2. Non-Projection node:
      a) Schema needs pruning. Insert the necessary projections to the children.
      b) All fields are required. Do nothing.

Bottom-up Phase:

This pass is required because modifying a plan node can change the column indices used by output nodes. When such a change occurs, we store the old and new indices of the columns in the node's state. We then proceed from the leaves to the root, updating the indices of columns in the plans by referencing these mapping records. After the top-down phase, also some unnecessary projections may emerge. When projections check its input schema mapping, it can remove itself and assign new schema mapping to the new node which was the projection's input formerly.

The designed node structure is:

struct ProjectionOptimizer {
    pub plan: Arc<dyn ExecutionPlan>,
    /// The node above expects it can reach these columns.
    pub required_columns: HashSet<Column>,
    /// The nodes above will be updated according to these mathces. First element indicates
    /// the initial column index, and the second element is for the updated version.
    pub schema_mapping: HashMap<Column, Column>,
    pub children_nodes: Vec<ProjectionOptimizer>,
}

To summarize, with two state variables for each plan node (one for transferring the required columns and one for the notifying changes of column indices), and with two passes (it is actually one pass, the bottom-up pass will be done implicitly during the attachment of transformed children to the self node), we will have a future-proof projection optimizer rule.

Describe alternatives you've considered

No response

Additional context

I am currently working on this issue. I will plan to open a PR for suggestions, especially on how to update the ExecutionPlan API to get rid of if else structure of all plans. It will be ready likely next week. It's not expected to significantly alter our current plans, but it will be a solid step towards optimizing potential outcomes following other existing optimizations and future ones.

@alamb
Copy link
Contributor

alamb commented Feb 2, 2024

(copying my comment from elsewhere)

FWIW there is a version of pushdown in InfluxDB (written by @crepererum ) that may serve as a useful reference https://github.com/influxdata/influxdb/blob/main/iox_query/src/physical_optimizer/projection_pushdown.rs

@alamb
Copy link
Contributor

alamb commented Feb 2, 2024

I have a two questions:

  1. Do you know of any examples of "algorithmic limitations" (e.g. plans where unnecessary columns are carried through)?
  2. How does this compare to how pushdown is done during LogicalPlanning (e.g.
    https://github.com/apache/arrow-datafusion/blob/main/datafusion/optimizer/src/push_down_projection.rs). Are you planning changes / extension to that logic too? One of the reason to do pushdown at that level is that it is less complicated (e.g. output indexes aren't used)

The idea of supporting ProjectionPushdown for userdefined plans sounds a good idea to me 👍 . If that is indeed the usecase adding a test showing it doesn't work today and works after your changes is 🆗

I will plan to open a PR for suggestions, especially on how to update the ExecutionPlan API to get rid of if else structure of all plans.

I assume you are referring to this code:
https://github.com/apache/arrow-datafusion/blob/7641a3228156aab0e48c4bab5a6834b44f722d89/datafusion/core/src/physical_optimizer/projection_pushdown.rs#L105-L143

Maybe you could add an API like this:

trait ExecutionPlan {
...
  /// Return a copy of this plan that only produces the specified outputs, if possible, in the specified order.
  /// the projection is a BTreeMap as it is unique and in increasing order (as some nodes like HashAggregateExec
  /// can't reorder their outputs 
  /// 
  /// For a plan plans such as `ProjectionExec`, projecting a subset of columns will reduce the
  /// expression list. 
  ///
  /// for some plans such as `HashAggregateExec`, projection may not be possible (e.g. it is not possible to remove 
  /// group columns from the output)
  /// 
  /// By default, returns Ok(None)
  fn try_project(projection: BTreeSet<usize>) -> Result<Option<Arc<dyn Self>>  { Ok(None) }
...

🤔

@berkaysynnada
Copy link
Contributor Author

I have a two questions:

  1. Do you know of any examples of "algorithmic limitations" (e.g. plans where unnecessary columns are carried through)?
  2. How does this compare to how pushdown is done during LogicalPlanning (e.g.
    https://github.com/apache/arrow-datafusion/blob/main/datafusion/optimizer/src/push_down_projection.rs). Are you planning changes / extension to that logic too? One of the reason to do pushdown at that level is that it is less complicated (e.g. output indexes aren't used)
  1. What I meant by algorithmic limitations was about the current rule. The main problem occurs in such a situation:
    ProjectionA <- OperatorA <- OperatorB
    Pushdown mechanism tries to push the projection between OperatorA and OperatorB. When it cannot pushdown it, the rule gives up. However, there might be some cases which the best solution shows up like this:
    ProjectionB <- OperatorA <- ProjectionC <- OperatorB

Another problem occurs when a projection is inserted into the input of some operator to decrease the load, however; that projection insertion may cause to change the indices of some columns at the output operators. In such a case, all operators object to a column index change must be rewritten.
I would like to eliminate those kind of restrictions by extending the capabilities of the current rule, and I believe pushing down mechanism falls behind it.

  1. I haven't deeply examined the rule in the logical plan, but I know the rule there is less complicated. However, after all the rules have been worked in a physical plan, it would be comforting for such a rule to double check the plan.

Maybe you could add an API like this:

The new version API requirements are a little different. I re-request your opinions when the PR is ready.
Thanks for your evaluation and suggestions.

@alamb
Copy link
Contributor

alamb commented Feb 5, 2024

I haven't deeply examined the rule in the logical plan, but I know the rule there is less complicated. However, after all the rules have been worked in a physical plan, it would be comforting for such a rule to double check the plan.

I was thinking the LogicalPlan pushdown rule might be interesting to you as it implements the type of pushdown you describe (pushing only columns that are needed and doesn't have to push them all).

Perhaps you can avoid having to invent a new algorithm

@berkaysynnada
Copy link
Contributor Author

If I remember correctly LogicalPlan rule works with qualified column names. Let's think there is a plan such as:
A <- B <- C
and B refers to SomeColumn with index 2. If a projection is inserted between B and C, and SomeColumn then comes from the 1th index, we should both update operator A and operator B. All expressions can be rewritten if a change occurs in the plan because of this kind of index changes. However LogicalPlan's do not deal with indices. Otherwise, you are right. Some kind of partial pushdowns could be enough with a little iterations on the current rule.

@alamb
Copy link
Contributor

alamb commented Feb 5, 2024

All expressions can be rewritten if a change occurs in the plan because of this kind of index changes.

Right -- I think the difference is that in the Physical realm, projection pushdown could maintain the mapping of "is output column N used" where N is the index.

Then the trick will be to implement the transformation for each type of ExecutionPlan to both:

  1. Rewrite itself if one of its output columns is not used
  2. Describe which of the columns on the input are required to create its output

With thse two functions I think you could implement a pretty simple pushdown rule

@berkaysynnada
Copy link
Contributor Author

Yes, as you said, in short, the rule consists of tracking the required columns across the plan and rewriting the plans by new expressions with updated indices.

@my-vegetable-has-exploded
Copy link
Contributor

If I remember correctly LogicalPlan rule works with qualified column names. Let's think there is a plan such as: A <- B <- C and B refers to SomeColumn with index 2. If a projection is inserted between B and C, and SomeColumn then comes from the 1th index, we should both update operator A and operator B. All expressions can be rewritten if a change occurs in the plan because of this kind of index changes. However LogicalPlan's do not deal with indices. Otherwise, you are right. Some kind of partial pushdowns could be enough with a little iterations on the current rule.

Is it like FunctionalDependence in DFSchema ?

@berkaysynnada
Copy link
Contributor Author

Is it like FunctionalDependence in DFSchema ?

I couldn't understand the way you think there is a similarity, can you give more detail?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants