Skip to content

Commit

Permalink
POC: Demonstrate new GroupHashAggregate stream approach
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jun 30, 2023
1 parent 90b38b0 commit e02c35d
Show file tree
Hide file tree
Showing 8 changed files with 801 additions and 4 deletions.
18 changes: 17 additions & 1 deletion datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use std::sync::Arc;
mod bounded_aggregate_stream;
mod no_grouping;
mod row_hash;
mod row_hash2;
mod utils;

pub use datafusion_expr::AggregateFunction;
Expand All @@ -58,6 +59,8 @@ use datafusion_physical_expr::utils::{
get_finer_ordering, ordering_satisfy_requirement_concrete,
};

use self::row_hash2::GroupedHashAggregateStream2;

/// Hash aggregate modes
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AggregateMode {
Expand Down Expand Up @@ -196,6 +199,7 @@ impl PartialEq for PhysicalGroupBy {
enum StreamType {
AggregateStream(AggregateStream),
GroupedHashAggregateStream(GroupedHashAggregateStream),
GroupedHashAggregateStream2(GroupedHashAggregateStream2),
BoundedAggregate(BoundedAggregateStream),
}

Expand All @@ -204,6 +208,7 @@ impl From<StreamType> for SendableRecordBatchStream {
match stream {
StreamType::AggregateStream(stream) => Box::pin(stream),
StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream),
StreamType::GroupedHashAggregateStream2(stream) => Box::pin(stream),
StreamType::BoundedAggregate(stream) => Box::pin(stream),
}
}
Expand Down Expand Up @@ -711,12 +716,23 @@ impl AggregateExec {
partition,
aggregation_ordering,
)?))
} else if self.use_poc_group_by() {
Ok(StreamType::GroupedHashAggregateStream2(
GroupedHashAggregateStream2::new(self, context, partition)?,
))
} else {
Ok(StreamType::GroupedHashAggregateStream(
GroupedHashAggregateStream::new(self, context, partition)?,
))
}
}

/// Returns true if we should use the POC group by stream
/// TODO: check for actually supported aggregates, etc
fn use_poc_group_by(&self) -> bool {
//info!("AAL Checking POC group by: {self:#?}");
true
}
}

impl ExecutionPlan for AggregateExec {
Expand Down Expand Up @@ -980,7 +996,7 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
Arc::new(Schema::new(group_fields))
}

/// returns physical expressions to evaluate against a batch
/// returns physical expressions for arguments to evaluate against a batch
/// The expressions are different depending on `mode`:
/// * Partial: AggregateExpr::expressions
/// * Final: columns of `AggregateExpr::state_fields()`
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Hash aggregation through row format

use log::info;
use std::cmp::min;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -119,6 +120,7 @@ impl GroupedHashAggregateStream {
context: Arc<TaskContext>,
partition: usize,
) -> Result<Self> {
info!("Creating GroupedHashAggregateStream");
let agg_schema = Arc::clone(&agg.schema);
let agg_group_by = agg.group_by.clone();
let agg_filter_expr = agg.filter_expr.clone();
Expand Down
Loading

0 comments on commit e02c35d

Please sign in to comment.