From ae8e612a5ee3f351c6b06f5968c8c28e1c7d8a39 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 25 Nov 2021 23:58:28 +0800 Subject: [PATCH] merge with main branch --- .../interpreters/interpreter_insert_into.rs | 53 +++++++++---------- query/src/interpreters/interpreter_select.rs | 9 +--- .../09_0004_remote_insert_into_select.sql | 4 +- 3 files changed, 29 insertions(+), 37 deletions(-) diff --git a/query/src/interpreters/interpreter_insert_into.rs b/query/src/interpreters/interpreter_insert_into.rs index b08fe5923bd0..56029e8d3364 100644 --- a/query/src/interpreters/interpreter_insert_into.rs +++ b/query/src/interpreters/interpreter_insert_into.rs @@ -23,6 +23,7 @@ use common_datavalues::DataType; use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::TableInfo; +use common_planners::Expression; use common_planners::InsertIntoPlan; use common_planners::PlanNode; use common_planners::SelectPlan; @@ -30,8 +31,6 @@ use common_planners::SinkPlan; use common_planners::StagePlan; use common_streams::DataBlockStream; use common_streams::SendableDataBlockStream; -use common_streams::SourceStream; -use common_streams::ValueSource; use futures::TryStreamExt; use crate::catalogs::Table; @@ -39,7 +38,6 @@ use crate::interpreters::plan_scheduler_ext; use crate::interpreters::utils::apply_plan_rewrite; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; -use crate::interpreters::SelectInterpreter; use crate::optimizers::Optimizers; use crate::pipelines::transforms::ExpressionExecutor; use crate::sessions::QueryContext; @@ -79,25 +77,7 @@ impl Interpreter for InsertIntoInterpreter { // if values are provided in SQL // e.g. `insert into ... value(...), ...` let values_exprs = self.plan.value_exprs_opt.clone().take().unwrap(); - let dummy = - DataSchemaRefExt::create(vec![DataField::new("dummy", DataType::UInt8, false)]); - let one_row_block = - DataBlock::create_by_array(dummy.clone(), vec![Series::new(vec![1u8])]); - - let blocks = values_exprs - .iter() - .map(|exprs| { - let executor = ExpressionExecutor::try_create( - "Insert into from values", - dummy.clone(), - self.plan.schema(), - exprs.clone(), - true, - )?; - executor.execute(&one_row_block) - }) - .collect::>>()?; - // merge into one block in sync mode + let blocks = self.block_from_values_exprs(values_exprs)?; let stream: SendableDataBlockStream = Box::pin(futures::stream::iter(vec![DataBlock::concat_blocks( &blocks, @@ -158,11 +138,8 @@ impl InsertIntoInterpreter { let cast_needed = select_schema != output_schema; // optimize and rewrite the SelectPlan.input - let optimized_plan = apply_plan_rewrite( - self.ctx.clone(), - Optimizers::create(self.ctx.clone()), - &select_plan.input, - )?; + let optimized_plan = + apply_plan_rewrite(Optimizers::create(self.ctx.clone()), &select_plan.input)?; // rewrite the optimized the plan let rewritten_plan = match optimized_plan { @@ -193,4 +170,26 @@ impl InsertIntoInterpreter { }; Ok(rewritten_plan) } + + fn block_from_values_exprs( + &self, + values_exprs: Vec>, + ) -> Result> { + let dummy = DataSchemaRefExt::create(vec![DataField::new("dummy", DataType::UInt8, false)]); + let one_row_block = DataBlock::create_by_array(dummy.clone(), vec![Series::new(vec![1u8])]); + + values_exprs + .iter() + .map(|exprs| { + let executor = ExpressionExecutor::try_create( + "Insert into from values", + dummy.clone(), + self.plan.schema(), + exprs.clone(), + true, + )?; + executor.execute(&one_row_block) + }) + .collect::>>() + } } diff --git a/query/src/interpreters/interpreter_select.rs b/query/src/interpreters/interpreter_select.rs index 58b2af22635c..ee5847f6d77d 100644 --- a/query/src/interpreters/interpreter_select.rs +++ b/query/src/interpreters/interpreter_select.rs @@ -12,15 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; use std::sync::Arc; -use std::task::Context; -use common_base::tokio::macros::support::Pin; -use common_base::tokio::macros::support::Poll; -use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_exception::Result; use common_planners::PlanNode; @@ -46,7 +39,7 @@ impl SelectInterpreter { } fn rewrite_plan(&self) -> Result { - apply_plan_rewrite(self.ctx.clone(), &self.select.input) + apply_plan_rewrite(Optimizers::create(self.ctx.clone()), &self.select.input) } } diff --git a/tests/suites/0_stateless/09_0004_remote_insert_into_select.sql b/tests/suites/0_stateless/09_0004_remote_insert_into_select.sql index 193315d34999..884dfe7fd9fc 100644 --- a/tests/suites/0_stateless/09_0004_remote_insert_into_select.sql +++ b/tests/suites/0_stateless/09_0004_remote_insert_into_select.sql @@ -16,12 +16,12 @@ select sum(a) from n1; CREATE TABLE n2(a UInt64, b UInt64); insert into n2 select number, number + 1 from numbers(10000); -select count(*) from n2; +select count(a) from n2; select sum(a), sum(b) from n2; -- "self reference" insert into n2 select * from n2; -select count(*) from n2; +select count(a) from n2; select sum(a), sum(b) from n2; -- aggregation