Skip to content

Commit

Permalink
Merge pull request #2945 from dantengsky/fix-2914
Browse files Browse the repository at this point in the history
ISSUE-2814: introducing distributed insertion
  • Loading branch information
BohuTANG authored Nov 26, 2021
2 parents b867f54 + ae8e612 commit 1f1e7ff
Show file tree
Hide file tree
Showing 38 changed files with 982 additions and 494 deletions.
35 changes: 12 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions common/dal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ common-metrics= {path = "../metrics"}

async-compat = "0.2.1"
async-trait = "0.1"
azure_core_mirror = "0.1.0"
azure_storage_mirror = { version = "0.1.0", features = ["blob"] }
bytes = "1"
futures = "0.3"
metrics = "0.17.0"
reqwest = "0.11"
rusoto_core = "0.47.0"
rusoto_s3 = "0.47.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
metrics = "0.17.0"
azure_core_mirror = "0.1.0"
azure_storage_mirror = { version = "0.1.0", features = ["blob"] }
reqwest = "0.11"

[dev-dependencies]
pretty_assertions = "1.0"
Expand Down
52 changes: 52 additions & 0 deletions common/dal/tests/it/accessors/local.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

use common_base::tokio;
use common_dal::DataAccessor;
use common_dal::Local;
use tempfile::TempDir;

async fn local_read(loops: u32) -> common_exception::Result<()> {
let tmp_root_dir = TempDir::new().unwrap();
let root_path = tmp_root_dir.path().to_str().unwrap();
let local_da = Local::new(root_path);

let mut files = vec![];
for i in 0..loops {
let file = format!("test_{}", i);
let random_bytes: Vec<u8> = (0..122).map(|_| rand::random::<u8>()).collect();
local_da.put(file.as_str(), random_bytes).await?;
files.push(file)
}

for x in files {
local_da.read(x.as_str()).await?;
}
Ok(())
}

// enable this if need to re-produce issue #2997
#[tokio::test]
#[ignore]
async fn test_da_local_hangs() -> common_exception::Result<()> {
let read_fut = local_read(100);
futures::executor::block_on(read_fut)
}

#[tokio::test]
async fn test_da_local_normal() -> common_exception::Result<()> {
let read_fut = local_read(1000);
read_fut.await
}
1 change: 1 addition & 0 deletions common/dal/tests/it/accessors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@

mod aws_s3;
mod azure_blob;
mod local;
3 changes: 3 additions & 0 deletions common/planners/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ mod plan_rewriter;
mod plan_select;
mod plan_setting;
mod plan_show_table_create;
mod plan_sink;
mod plan_sort;
mod plan_stage;
mod plan_statistics;
Expand Down Expand Up @@ -127,6 +128,8 @@ pub use plan_select::SelectPlan;
pub use plan_setting::SettingPlan;
pub use plan_setting::VarValue;
pub use plan_show_table_create::ShowCreateTablePlan;
pub use plan_sink::SinkPlan;
pub use plan_sink::SINK_SCHEMA;
pub use plan_sort::SortPlan;
pub use plan_stage::StageKind;
pub use plan_stage::StagePlan;
Expand Down
37 changes: 5 additions & 32 deletions common/planners/src/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
use std::sync::Arc;

use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::plan_broadcast::BroadcastPlan;
use crate::plan_subqueries_set::SubQueriesSetPlan;
Expand Down Expand Up @@ -47,6 +45,7 @@ use crate::RemotePlan;
use crate::SelectPlan;
use crate::SettingPlan;
use crate::ShowCreateTablePlan;
use crate::SinkPlan;
use crate::SortPlan;
use crate::StagePlan;
use crate::TruncateTablePlan;
Expand All @@ -69,6 +68,7 @@ pub enum PlanNode {
Limit(LimitPlan),
LimitBy(LimitByPlan),
ReadSource(ReadDataSourcePlan),
Sink(SinkPlan),
Select(SelectPlan),
Explain(ExplainPlan),
CreateDatabase(CreateDatabasePlan),
Expand Down Expand Up @@ -126,6 +126,7 @@ impl PlanNode {
PlanNode::AlterUser(v) => v.schema(),
PlanNode::DropUser(v) => v.schema(),
PlanNode::GrantPrivilege(v) => v.schema(),
PlanNode::Sink(v) => v.schema(),
PlanNode::Copy(v) => v.schema(),
}
}
Expand Down Expand Up @@ -164,6 +165,7 @@ impl PlanNode {
PlanNode::AlterUser(_) => "AlterUser",
PlanNode::DropUser(_) => "DropUser",
PlanNode::GrantPrivilege(_) => "GrantPrivilegePlan",
PlanNode::Sink(_) => "SinkPlan",
PlanNode::Copy(_) => "CopyPlan",
}
}
Expand All @@ -183,6 +185,7 @@ impl PlanNode {
PlanNode::Select(v) => vec![v.input.clone()],
PlanNode::Sort(v) => vec![v.input.clone()],
PlanNode::SubQueryExpression(v) => v.get_inputs(),
PlanNode::Sink(v) => vec![v.input.clone()],

_ => vec![],
}
Expand All @@ -191,34 +194,4 @@ impl PlanNode {
pub fn input(&self, n: usize) -> Arc<PlanNode> {
self.inputs()[n].clone()
}

pub fn set_inputs(&mut self, inputs: Vec<&PlanNode>) -> Result<()> {
if inputs.is_empty() {
return Result::Err(ErrorCode::BadPlanInputs("Inputs must not be empty"));
}

match self {
PlanNode::Stage(v) => v.set_input(inputs[0]),
PlanNode::Broadcast(v) => v.set_input(inputs[0]),
PlanNode::Projection(v) => v.set_input(inputs[0]),
PlanNode::Expression(v) => v.set_input(inputs[0]),
PlanNode::AggregatorPartial(v) => v.set_input(inputs[0]),
PlanNode::AggregatorFinal(v) => v.set_input(inputs[0]),
PlanNode::Filter(v) => v.set_input(inputs[0]),
PlanNode::Having(v) => v.set_input(inputs[0]),
PlanNode::Limit(v) => v.set_input(inputs[0]),
PlanNode::Explain(v) => v.set_input(inputs[0]),
PlanNode::Select(v) => v.set_input(inputs[0]),
PlanNode::Sort(v) => v.set_input(inputs[0]),
PlanNode::SubQueryExpression(v) => v.set_inputs(inputs),
_ => {
return Err(ErrorCode::UnImplement(format!(
"UnImplement set_inputs for {:?}",
self
)));
}
}

Ok(())
}
}
6 changes: 6 additions & 0 deletions common/planners/src/plan_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::RemotePlan;
use crate::SelectPlan;
use crate::SettingPlan;
use crate::ShowCreateTablePlan;
use crate::SinkPlan;
use crate::SortPlan;
use crate::StagePlan;
use crate::TruncateTablePlan;
Expand Down Expand Up @@ -114,6 +115,7 @@ pub trait PlanRewriter {
PlanNode::AlterUser(plan) => self.alter_user(plan),
PlanNode::DropUser(plan) => self.drop_user(plan),
PlanNode::GrantPrivilege(plan) => self.grant_privilege(plan),
PlanNode::Sink(plan) => self.rewrite_sink(plan),
}
}

Expand Down Expand Up @@ -367,6 +369,10 @@ pub trait PlanRewriter {
fn grant_privilege(&mut self, plan: &GrantPrivilegePlan) -> Result<PlanNode> {
Ok(PlanNode::GrantPrivilege(plan.clone()))
}

fn rewrite_sink(&mut self, plan: &SinkPlan) -> Result<PlanNode> {
Ok(PlanNode::Sink(plan.clone()))
}
}

pub struct RewriteHelper {}
Expand Down
45 changes: 45 additions & 0 deletions common/planners/src/plan_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2020 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_datavalues::DataField;
use common_datavalues::DataSchemaRef;
use common_datavalues::DataSchemaRefExt;
use common_datavalues::DataType;
use common_meta_types::TableInfo;
use lazy_static::lazy_static;

use crate::PlanNode;

lazy_static! {
pub static ref SINK_SCHEMA: DataSchemaRef = DataSchemaRefExt::create(vec![
DataField::new("seg_loc", DataType::String, false),
DataField::new("seg_info", DataType::String, false),
]);
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub struct SinkPlan {
pub table_info: TableInfo,
pub input: Arc<PlanNode>,
pub cast_needed: bool,
}

impl SinkPlan {
/// Return sink schema
pub fn schema(&self) -> DataSchemaRef {
SINK_SCHEMA.clone()
}
}
5 changes: 5 additions & 0 deletions common/planners/src/plan_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::RemotePlan;
use crate::SelectPlan;
use crate::SettingPlan;
use crate::ShowCreateTablePlan;
use crate::SinkPlan;
use crate::SortPlan;
use crate::StagePlan;
use crate::TruncateTablePlan;
Expand Down Expand Up @@ -127,6 +128,7 @@ pub trait PlanVisitor {
PlanNode::AlterUser(plan) => self.visit_alter_user(plan),
PlanNode::DropUser(plan) => self.visit_drop_user(plan),
PlanNode::GrantPrivilege(plan) => self.visit_grant_privilege(plan),
PlanNode::Sink(plan) => self.visit_append(plan),
}
}

Expand Down Expand Up @@ -297,4 +299,7 @@ pub trait PlanVisitor {
fn visit_kill_query(&mut self, _: &KillPlan) -> Result<()> {
Ok(())
}
fn visit_append(&mut self, _: &SinkPlan) -> Result<()> {
Ok(())
}
}
Loading

0 comments on commit 1f1e7ff

Please sign in to comment.