Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-2814: introducing distributed insertion #2945

Merged
merged 28 commits into from
Nov 26, 2021

Conversation

dantengsky
Copy link
Member

@dantengsky dantengsky commented Nov 22, 2021

I hereby agree to the terms of the CLA available at: https://databend.rs/policies/cla/

Summary

  • issue Proposal: supports parallel insertion #2914

  • Table::append been split into append and commit

    • append returns a table specific SendableDataBlockStream

      for fuse table, a log of append operations were encoded into the stream; for other writable tables, returns an empty stream

    • commit takes the stream returned by append, and interpreter it in a table-specific way

      for fuse table, it would be a stream of segment information, upon which, the table will try to generate a new snapshot and commit to meta-server.

      for other writable tables, the stream will be ignored (and returns ok)

    InsertIntoInterpreter is in charge of feeding upstream values into Table::append and putting what append returned into Table::commit

  • If applicable, a distributed append schedule will be utilized

    e.g. statements like insert into t1 select * from numbers(10000), append will be done distributedly(if in cluster mode), but committed by one query node.

    but for stmts like insert into n1 select sum(number) from numbers(1000) group by number %3, the append will still be done inside one query node.

  • the main idea of planning goes like this:

           // before schedule, and SinkPlan is where table::append happens
            let rewritten_plan = match optimized_plan {
                // if it is a StagePlan Node, we insert the a SinkPlan in between the Stage and Stage.input
                // i.e.
                //    StagePlan <~ PlanNodeA  => StagePlan <~ Sink <~ PlanNodeA
                PlanNode::Stage(r) => {
                    let prev_input = r.input.clone();
                    let sink = PlanNode::Sink(SinkPlan {
                        table_info: table_info.clone(),
                        input: prev_input,
                        cast_needed,
                    });
                    PlanNode::Stage(StagePlan {
                        kind: r.kind,
                        input: Arc::new(sink),
                        scatters_expr: r.scatters_expr,
                    })
                }
                // otherwise, we just prepend a SinkPlan
                // i.e.
                //    node <~ PlanNodeA  => Sink<~ node <~ PlanNodeA
                node => PlanNode::Sink(SinkPlan {
                    table_info: table_info.clone(),
                    input: Arc::new(node),
                    cast_needed,
                }),
            };

    but 1) since planning is rather sophisticated, if there were cases that the above arrangement can not be covered, please let me know. 2) currently, the above logic is embedded in InsertIntoInterpreter, if somewhere else is preferred, pls let me know

    the following cases have been tested in standalone and cluster-3-nodes setups:

    create table n1(a uint64);
    insert into n1 select number from numbers(10000);
    select sum(a) from n1;
    
    CREATE TABLE n2(a UInt64, b UInt64);
    insert into n2 select number, number + 1 from numbers(10000);
    select count(a) from n2;
    select sum(a), sum(b) from n2;
    
    -- "self reference"
    insert into n2 select * from n2;
    select count(a) from n2;
    select sum(a), sum(b) from n2;
    
    -- aggregation
    create table n3(a uint64, b uint64);
    insert into n3 select sum(a), sum(b) from n2;
    select * from n3;
    
    -- cast
    create table s1(a String, b String);
    insert into s1 select number, number + 1 from numbers(10000);
    select sum(cast(a as uint64)), sum(cast(b as uint64)) from s1;
    
  • misc.

    • about the ExpressionTransform that @sundy-li kindly suggested

      in this PR, no explicit expression transform has been arranged, a CastStream is enabled if necessary inside the SinkTransform, hope this is doing the same thing.

    • @zhang2014 SelectInterpreter has been slightly refactored since InsertIntoInterpreter reused lots of code from it

      some common stuff has been extracted to plan_scheduler_ext.rs (any better name?)

    • @BohuTANG refactoring of fuse::util has been postponed ( to a dedicated PR)

      so that this PR might be easier to be reviewed

    • explain pipeline insert ... does not work yet

Changelog

  • Improvement
  • Not for changelog (changelog entry is not required)

Related Issues

Fixes #2914

Test Plan

Unit Tests

Stateless Tests

@databend-bot
Copy link
Member

Thanks for the contribution!
I have applied any labels matching special text in your PR Changelog.

Please review the labels and make any necessary changes.

@codecov-commenter
Copy link

codecov-commenter commented Nov 22, 2021

Codecov Report

Merging #2945 (ae8e612) into main (a69c4cf) will increase coverage by 0%.
The diff coverage is 71%.

Impacted file tree graph

@@          Coverage Diff           @@
##            main   #2945    +/-   ##
======================================
  Coverage     68%     68%            
======================================
  Files        651     656     +5     
  Lines      34223   34363   +140     
======================================
+ Hits       23414   23529   +115     
- Misses     10809   10834    +25     
Impacted Files Coverage Δ
common/planners/src/plan_rewriter.rs 50% <0%> (-1%) ⬇️
.../src/datasources/table/fuse/meta/table_snapshot.rs 63% <ø> (ø)
query/src/datasources/table/fuse/read_plan.rs 97% <ø> (ø)
...y/src/datasources/table/fuse/table_test_fixture.rs 94% <ø> (+<1%) ⬆️
query/src/interpreters/interpreter_copy.rs 0% <0%> (ø)
query/src/interpreters/plan_scheduler.rs 32% <0%> (-2%) ⬇️
query/src/interpreters/plan_scheduler_ext.rs 44% <44%> (ø)
common/planners/src/plan_node.rs 43% <50%> (+6%) ⬆️
common/planners/src/plan_sink.rs 50% <50%> (ø)
query/src/catalogs/table.rs 73% <50%> (+<1%) ⬆️
... and 29 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a69c4cf...ae8e612. Read the comment docs.

@@ -215,3 +217,29 @@ pub fn merge_stats(schema: &DataSchema, l: &Stats, r: &Stats) -> Result<Stats> {
};
Ok(s)
}

pub fn merge_appends(
Copy link
Member

@BohuTANG BohuTANG Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emm, it seems it's time to move this helper to a folder named like statistics, 'util' seems not a good name for others to explore the codes, it would be nice to separate them to the directories :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree ,there is a pending pr #2408 ,shall i merge it into this pr?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, pls do not bother reviewing the pr #2480, forgot that the pr accumulated lots of irrelevant changes. Let me do the refactoring in this pr directly

Copy link
Member

@BohuTANG BohuTANG Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.
Sorry I missed there is a refine issue #2408 already, we can do the refactoring in that PR if you don't feel right here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

@dantengsky
Copy link
Member Author

btw, although ut and stateless tests passed, there are still something known issues . Trying to fix them

@dantengsky dantengsky marked this pull request as ready for review November 25, 2021 16:11
@BohuTANG
Copy link
Member

/lgtm

@databend-bot
Copy link
Member

Wait for another reviewer approval

@BohuTANG
Copy link
Member

Merge first, some comments also welcome...

@BohuTANG BohuTANG merged commit 1f1e7ff into databendlabs:main Nov 26, 2021
@BohuTANG
Copy link
Member

@dantengsky
I think there are some tasks we need to do with this PR, feel free to open issues to track them as you mentioned in the summary :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Proposal: supports parallel insertion
4 participants