Skip to content

Commit

Permalink
merge with main branch
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Nov 25, 2021
1 parent dbe45aa commit ae8e612
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 37 deletions.
53 changes: 26 additions & 27 deletions query/src/interpreters/interpreter_insert_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,21 @@ use common_datavalues::DataType;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::TableInfo;
use common_planners::Expression;
use common_planners::InsertIntoPlan;
use common_planners::PlanNode;
use common_planners::SelectPlan;
use common_planners::SinkPlan;
use common_planners::StagePlan;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use common_streams::SourceStream;
use common_streams::ValueSource;
use futures::TryStreamExt;

use crate::catalogs::Table;
use crate::interpreters::plan_scheduler_ext;
use crate::interpreters::utils::apply_plan_rewrite;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::interpreters::SelectInterpreter;
use crate::optimizers::Optimizers;
use crate::pipelines::transforms::ExpressionExecutor;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -79,25 +77,7 @@ impl Interpreter for InsertIntoInterpreter {
// if values are provided in SQL
// e.g. `insert into ... value(...), ...`
let values_exprs = self.plan.value_exprs_opt.clone().take().unwrap();
let dummy =
DataSchemaRefExt::create(vec![DataField::new("dummy", DataType::UInt8, false)]);
let one_row_block =
DataBlock::create_by_array(dummy.clone(), vec![Series::new(vec![1u8])]);

let blocks = values_exprs
.iter()
.map(|exprs| {
let executor = ExpressionExecutor::try_create(
"Insert into from values",
dummy.clone(),
self.plan.schema(),
exprs.clone(),
true,
)?;
executor.execute(&one_row_block)
})
.collect::<Result<Vec<_>>>()?;
// merge into one block in sync mode
let blocks = self.block_from_values_exprs(values_exprs)?;
let stream: SendableDataBlockStream =
Box::pin(futures::stream::iter(vec![DataBlock::concat_blocks(
&blocks,
Expand Down Expand Up @@ -158,11 +138,8 @@ impl InsertIntoInterpreter {
let cast_needed = select_schema != output_schema;

// optimize and rewrite the SelectPlan.input
let optimized_plan = apply_plan_rewrite(
self.ctx.clone(),
Optimizers::create(self.ctx.clone()),
&select_plan.input,
)?;
let optimized_plan =
apply_plan_rewrite(Optimizers::create(self.ctx.clone()), &select_plan.input)?;

// rewrite the optimized the plan
let rewritten_plan = match optimized_plan {
Expand Down Expand Up @@ -193,4 +170,26 @@ impl InsertIntoInterpreter {
};
Ok(rewritten_plan)
}

fn block_from_values_exprs(
&self,
values_exprs: Vec<Vec<Expression>>,
) -> Result<Vec<DataBlock>> {
let dummy = DataSchemaRefExt::create(vec![DataField::new("dummy", DataType::UInt8, false)]);
let one_row_block = DataBlock::create_by_array(dummy.clone(), vec![Series::new(vec![1u8])]);

values_exprs
.iter()
.map(|exprs| {
let executor = ExpressionExecutor::try_create(
"Insert into from values",
dummy.clone(),
self.plan.schema(),
exprs.clone(),
true,
)?;
executor.execute(&one_row_block)
})
.collect::<Result<Vec<_>>>()
}
}
9 changes: 1 addition & 8 deletions query/src/interpreters/interpreter_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;

use common_base::tokio::macros::support::Pin;
use common_base::tokio::macros::support::Poll;
use common_datablocks::DataBlock;
use common_datavalues::DataSchemaRef;
use common_exception::Result;
use common_planners::PlanNode;
Expand All @@ -46,7 +39,7 @@ impl SelectInterpreter {
}

fn rewrite_plan(&self) -> Result<PlanNode> {
apply_plan_rewrite(self.ctx.clone(), &self.select.input)
apply_plan_rewrite(Optimizers::create(self.ctx.clone()), &self.select.input)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ select sum(a) from n1;

CREATE TABLE n2(a UInt64, b UInt64);
insert into n2 select number, number + 1 from numbers(10000);
select count(*) from n2;
select count(a) from n2;
select sum(a), sum(b) from n2;

-- "self reference"
insert into n2 select * from n2;
select count(*) from n2;
select count(a) from n2;
select sum(a), sum(b) from n2;

-- aggregation
Expand Down

0 comments on commit ae8e612

Please sign in to comment.