Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Enforce Dist capabilities to fix, sub optimal bad plans #7671

Merged
merged 19 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 35 additions & 25 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,8 @@
use std::str::FromStr;
use std::{any::Any, sync::Arc};

use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion_common::FileTypeWriterOptions;
use datafusion_common::{internal_err, plan_err, project_schema, SchemaExt, ToDFSchema};
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr};
use futures::{future, stream, StreamExt, TryStreamExt};
use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
use super::PartitionedFile;

use crate::datasource::file_format::file_compression_type::{
FileCompressionType, FileTypeExt,
Expand All @@ -54,13 +46,21 @@ use crate::{
logical_expr::Expr,
physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
};
use datafusion_common::FileType;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use datafusion_common::{
internal_err, plan_err, project_schema, FileType, FileTypeWriterOptions, SchemaExt,
ToDFSchema,
};
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr};

use super::PartitionedFile;

use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
use async_trait::async_trait;
use futures::{future, stream, StreamExt, TryStreamExt};

/// Configuration for creating a [`ListingTable`]
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -996,6 +996,9 @@ impl ListingTable {

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs::File;

use super::*;
use crate::datasource::{provider_as_source, MemTable};
use crate::execution::options::ArrowReadOptions;
Expand All @@ -1010,14 +1013,13 @@ mod tests {
logical_expr::{col, lit},
test::{columns, object_store::register_test_store},
};

use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::assert_contains;
use datafusion_common::GetExt;
use datafusion_expr::LogicalPlanBuilder;
use datafusion_common::{assert_contains, GetExt, ScalarValue};
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};

use rstest::*;
use std::collections::HashMap;
use std::fs::File;
use tempfile::TempDir;

/// It creates dummy file and checks if it can create unbounded input executors.
Expand Down Expand Up @@ -2048,6 +2050,7 @@ mod tests {
}
None => SessionContext::new(),
};
let target_partition_number = session_ctx.state().config().target_partitions();

// Create a new schema with one field called "a" of type Int32
let schema = Arc::new(Schema::new(vec![Field::new(
Expand All @@ -2056,6 +2059,12 @@ mod tests {
false,
)]));

let filter_predicate = Expr::BinaryExpr(BinaryExpr::new(
Box::new(Expr::Column("column1".into())),
Operator::GtEq,
Box::new(Expr::Literal(ScalarValue::Int32(Some(0)))),
));

// Create a new batch of data to insert into the table
let batch = RecordBatch::try_new(
schema.clone(),
Expand Down Expand Up @@ -2136,8 +2145,10 @@ mod tests {
let source = provider_as_source(source_table);
// Create a table scan logical plan to read from the source table
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?
.repartition(Partitioning::Hash(vec![Expr::Column("column1".into())], 6))?
.filter(filter_predicate)?
.build()?;
// Since logical plan contains a filter, increasing parallelism is helpful.
// Therefore, we will have 8 partitions in the final plan.
// Create an insert plan to insert the source data into the initial table
let insert_into_table =
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?;
Expand All @@ -2146,7 +2157,6 @@ mod tests {
.state()
.create_physical_plan(&insert_into_table)
.await?;

// Execute the physical plan and collect the results
let res = collect(plan, session_ctx.task_ctx()).await?;
// Insert returns the number of rows written, in our case this would be 6.
Expand Down Expand Up @@ -2178,9 +2188,9 @@ mod tests {
// Assert that the batches read from the file match the expected result.
assert_batches_eq!(expected, &batches);

// Assert that 6 files were added to the table
// Assert that `target_partition_number` many files were added to the table.
let num_files = tmp_dir.path().read_dir()?.count();
assert_eq!(num_files, 6);
assert_eq!(num_files, target_partition_number);

// Create a physical plan from the insert plan
let plan = session_ctx
Expand Down Expand Up @@ -2221,9 +2231,9 @@ mod tests {
// Assert that the batches read from the file after the second append match the expected result.
assert_batches_eq!(expected, &batches);

// Assert that another 6 files were added to the table
// Assert that another `target_partition_number` many files were added to the table.
let num_files = tmp_dir.path().read_dir()?.count();
assert_eq!(num_files, 12);
assert_eq!(num_files, 2 * target_partition_number);

// Return Ok if the function
Ok(())
Expand Down
Loading