Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pushdown aggr, limit and sort plan #2495

Merged
merged 12 commits into from
Sep 28, 2023
8 changes: 6 additions & 2 deletions src/promql/src/extension_plan/series_divide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl ExecutionPlan for SeriesDivideExec {

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
let input_schema = self.input.schema();
let exprs = self
let exprs: Vec<PhysicalSortRequirement> = self
.tag_columns
.iter()
.map(|tag| PhysicalSortRequirement {
Expand All @@ -148,7 +148,11 @@ impl ExecutionPlan for SeriesDivideExec {
options: None,
})
.collect();
vec![Some(exprs)]
if !exprs.is_empty() {
vec![Some(exprs)]
} else {
vec![None]
}
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
Expand Down
15 changes: 3 additions & 12 deletions src/query/src/dist_plan/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl PlanRewriter {
return true;
}

match Categorizer::check_plan(plan) {
match Categorizer::check_plan(plan,self.partition_cols.clone()) {
waynexia marked this conversation as resolved.
Show resolved Hide resolved
Commutativity::Commutative => {}
Commutativity::PartialCommutative => {
if let Some(plan) = partial_commutative_transformer(plan) {
Expand All @@ -161,7 +161,6 @@ impl PlanRewriter {
self.stage.push(plan)
}
},
Commutativity::CheckPartition
| Commutativity::NonCommutative
| Commutativity::Unimplemented
| Commutativity::Unsupported => {
Expand Down Expand Up @@ -351,11 +350,7 @@ mod test {

let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = [
"Aggregate: groupBy=[[]], aggr=[[AVG(t.number)]]",
" MergeScan [is_placeholder=false]",
]
.join("\n");
let expected = "MergeScan [is_placeholder=false]";
assert_eq!(expected, format!("{:?}", result));
}

Expand Down Expand Up @@ -402,11 +397,7 @@ mod test {

let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = [
"Limit: skip=0, fetch=1",
" MergeScan [is_placeholder=false]",
]
.join("\n");
let expected = "MergeScan [is_placeholder=false]";
assert_eq!(expected, format!("{:?}", result));
}

Expand Down
77 changes: 71 additions & 6 deletions src/query/src/dist_plan/commutativity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use datafusion_expr::utils::exprlist_to_columns;
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
use promql::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
Expand All @@ -29,15 +31,16 @@
TransformedCommutative(Option<Transformer>),
NonCommutative,
Unimplemented,
CheckPartition,
/// For unrelated plans like DDL
Unsupported,
}

pub struct Categorizer {}

impl Categorizer {
pub fn check_plan(plan: &LogicalPlan) -> Commutativity {
pub fn check_plan(plan: &LogicalPlan, partition_cols: Option<Vec<String>>) -> Commutativity {
let partition_cols = partition_cols.unwrap_or_default();

match plan {
LogicalPlan::Projection(proj) => {
for expr in &proj.expr {
Expand All @@ -51,11 +54,23 @@
// TODO(ruihang): Change this to Commutative once Like is supported in substrait
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
LogicalPlan::Window(_) => Commutativity::Unimplemented,
LogicalPlan::Aggregate(_) => {
LogicalPlan::Aggregate(aggr) => {
if Self::check_partition(&aggr.group_expr, &partition_cols) {
return Commutativity::Commutative;
}

// check all children exprs and uses the strictest level
Commutativity::Unimplemented
}
LogicalPlan::Sort(_) => Commutativity::Unimplemented,
LogicalPlan::Sort(_) => {
if partition_cols.is_empty() {
return Commutativity::Commutative;
}

// sort plan needs to consider column priority
// We can implement a merge-sort on partial ordered data
Commutativity::Unimplemented
}
LogicalPlan::Join(_) => Commutativity::NonCommutative,
LogicalPlan::CrossJoin(_) => Commutativity::NonCommutative,
LogicalPlan::Repartition(_) => {
Expand All @@ -67,7 +82,17 @@
LogicalPlan::EmptyRelation(_) => Commutativity::NonCommutative,
LogicalPlan::Subquery(_) => Commutativity::Unimplemented,
LogicalPlan::SubqueryAlias(_) => Commutativity::Unimplemented,
LogicalPlan::Limit(_) => Commutativity::PartialCommutative,
LogicalPlan::Limit(limit) => {
// Only execute `fetch` on remote nodes.
// wait for https://github.com/apache/arrow-datafusion/pull/7669
if partition_cols.is_empty() && limit.fetch.is_some() {
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
Commutativity::Commutative
} else if limit.skip == 0 && limit.fetch.is_some() {
Commutativity::PartialCommutative
} else {
Commutativity::Unimplemented
}
}
LogicalPlan::Extension(extension) => {
Self::check_extension_plan(extension.node.as_ref() as _)
}
Expand All @@ -93,7 +118,7 @@
|| name == SeriesDivide::name()
|| name == MergeScanLogicalPlan::name() =>
{
Commutativity::Commutative
Commutativity::Unimplemented
}
_ => Commutativity::Unsupported,
}
Expand Down Expand Up @@ -142,10 +167,50 @@
| Expr::OuterReferenceColumn(_, _) => Commutativity::Unimplemented,
}
}

/// Return true if the given expr and partition cols satisified the rule.

Check warning on line 171 in src/query/src/dist_plan/commutativity.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"satisified" should be "satisfied".

Check warning on line 171 in src/query/src/dist_plan/commutativity.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"satisified" should be "satisfied".
/// In this case the plan can be treated as fully commutative.
fn check_partition(exprs: &[Expr], partition_cols: &[String]) -> bool {
let mut ref_cols = HashSet::new();
if exprlist_to_columns(exprs, &mut ref_cols).is_err() {
return false;
}
let ref_cols = ref_cols
.into_iter()
.map(|c| c.flat_name())
.collect::<HashSet<_>>();
for col in partition_cols {
if !ref_cols.contains(col) {
return false;
}
}

true
}
}

pub type Transformer = Arc<dyn Fn(&LogicalPlan) -> Option<LogicalPlan>>;

pub fn partial_commutative_transformer(plan: &LogicalPlan) -> Option<LogicalPlan> {
Some(plan.clone())
}

#[cfg(test)]
mod test {
use datafusion_expr::{LogicalPlanBuilder, Sort};

use super::*;

#[test]
fn sort_on_empty_partition() {
let plan = LogicalPlan::Sort(Sort {
expr: vec![],
input: Arc::new(LogicalPlanBuilder::empty(false).build().unwrap()),
fetch: None,
});
assert!(matches!(
Categorizer::check_plan(&plan, Some(vec![])),
Commutativity::Commutative
));
}
}
15 changes: 5 additions & 10 deletions src/query/src/dist_plan/merge_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl MergeScanExec {
let regions = self.regions.clone();
let region_query_handler = self.region_query_handler.clone();
let metric = MergeScanMetric::new(&self.metric);
let schema = Self::arrow_schema_to_schema(self.schema())?;

let stream = Box::pin(stream!({
let _finish_timer = metric.finish_time().timer();
Expand All @@ -176,12 +177,14 @@ impl MergeScanExec {

while let Some(batch) = stream.next().await {
let batch = batch?;
// reconstruct batch using `self.schema`
// to remove metadata and correct column name
let batch = RecordBatch::new(schema.clone(), batch.columns().iter().cloned())?;
metric.record_output_batch_rows(batch.num_rows());
yield Ok(Self::remove_metadata_from_record_batch(batch));

if let Some(first_consume_timer) = first_consume_timer.as_mut().take() {
first_consume_timer.stop();
}
yield Ok(batch);
}
}
}));
Expand All @@ -193,14 +196,6 @@ impl MergeScanExec {
}))
}

fn remove_metadata_from_record_batch(batch: RecordBatch) -> RecordBatch {
let arrow_schema = batch.schema.arrow_schema().as_ref();
let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema);
let schema_without_metadata =
Self::arrow_schema_to_schema(arrow_schema_without_metadata).unwrap();
RecordBatch::new(schema_without_metadata, batch.columns().iter().cloned()).unwrap()
}

fn arrow_schema_without_metadata(arrow_schema: &ArrowSchema) -> ArrowSchemaRef {
Arc::new(ArrowSchema::new(
arrow_schema
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/mysql/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl MysqlServer {
if let Err(e) = Self::do_handle(stream, spawn_ref, spawn_config).await {
// TODO(LFC): Write this error to client as well, in MySQL text protocol.
// Looks like we have to expose opensrv-mysql's `PacketWriter`?
warn!("Internal error occurred during query exec, server actively close the channel to let client try next time: {}.", e)
warn!(e; "Internal error occurred during query exec, server actively close the channel to let client try next time")
}
decrement_gauge!(crate::metrics::METRIC_MYSQL_CONNECTIONS, 1.0);
});
Expand Down
6 changes: 2 additions & 4 deletions tests/cases/distributed/explain/order_by.result
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ EXPLAIN SELECT a, b FROM test ORDER BY a, b;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Sort: test.a ASC NULLS LAST, test.b ASC NULLS LAST_|
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST] |
|_|_MergeScanExec: REDACTED
| logical_plan_| MergeScan [is_placeholder=false]_|
| physical_plan | MergeScanExec: REDACTED
|_|_|
+-+-+

Expand Down
17 changes: 4 additions & 13 deletions tests/cases/distributed/explain/single_partition.result
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,8 @@ EXPLAIN SELECT SUM(i) FROM single_partition;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Aggregate: groupBy=[[]], aggr=[[SUM(single_partition.i)]]_|
|_|_Projection: single_partition.i_|
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[SUM(single_partition.i)]_|
|_|_CoalescePartitionsExec_|
|_|_AggregateExec: mode=Partial, gby=[], aggr=[SUM(single_partition.i)]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_ProjectionExec: expr=[i@0 as i]_|
|_|_MergeScanExec: REDACTED
| logical_plan_| MergeScan [is_placeholder=false]_|
| physical_plan | MergeScanExec: REDACTED
|_|_|
+-+-+

Expand All @@ -56,10 +49,8 @@ EXPLAIN SELECT * FROM single_partition ORDER BY i DESC;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Sort: single_partition.i DESC NULLS FIRST_|
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | SortExec: expr=[i@0 DESC]_|
|_|_MergeScanExec: REDACTED
| logical_plan_| MergeScan [is_placeholder=false]_|
| physical_plan | MergeScanExec: REDACTED
|_|_|
+-+-+

Expand Down
4 changes: 1 addition & 3 deletions tests/cases/distributed/explain/subqueries.result
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ EXPLAIN INSERT INTO other SELECT i, 2 FROM integers WHERE i=(SELECT MAX(i) FROM
| | Projection: integers.i |
| | MergeScan [is_placeholder=false] |
| | SubqueryAlias: __scalar_sq_1 |
| | Aggregate: groupBy=[[]], aggr=[[MAX(integers.i)]] |
| | Projection: integers.i |
| | MergeScan [is_placeholder=false] |
| | MergeScan [is_placeholder=false] |
+--------------+-------------------------------------------------------------------+

drop table other;
Expand Down
14 changes: 4 additions & 10 deletions tests/cases/distributed/optimizer/order_by.result
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ explain select * from numbers order by number desc;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: numbers.number DESC NULLS FIRST |
| | MergeScan [is_placeholder=false] |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
Expand All @@ -28,8 +27,7 @@ explain select * from numbers order by number asc;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: numbers.number ASC NULLS LAST |
| | MergeScan [is_placeholder=false] |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
Expand All @@ -41,9 +39,7 @@ explain select * from numbers order by number desc limit 10;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number DESC NULLS FIRST, fetch=10 |
| | MergeScan [is_placeholder=false] |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
Expand All @@ -56,9 +52,7 @@ explain select * from numbers order by number asc limit 10;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number ASC NULLS LAST, fetch=10 |
| | MergeScan [is_placeholder=false] |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
Expand Down
Loading
Loading