From 0d8d91a2583a445a9e234d81fa99c230ffa8d187 Mon Sep 17 00:00:00 2001 From: Manoj Inukolunu Date: Sat, 13 Jan 2024 11:38:02 +0530 Subject: [PATCH 1/7] Dont consider struct fields for filtering in parquet --- .../datasource/physical_plan/parquet/mod.rs | 66 ++++++++++++++++++- .../physical_plan/parquet/page_filter.rs | 2 +- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 84b312520161..b42107384d97 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -782,7 +782,7 @@ mod tests { array::{Int64Array, Int8Array, StringArray}, datatypes::{DataType, Field, SchemaBuilder}, }; - use arrow_array::Date64Array; + use arrow_array::{Date64Array, GenericStringArray, StructArray}; use chrono::{TimeZone, Utc}; use datafusion_common::{assert_contains, ToDFSchema}; use datafusion_common::{FileType, GetExt, ScalarValue}; @@ -795,6 +795,9 @@ mod tests { use object_store::ObjectMeta; use std::fs::{self, File}; use std::io::Write; + use arrow_array::cast::AsArray; + use arrow_schema::Fields; + use parquet::arrow::ArrowWriter; use tempfile::TempDir; use url::Url; @@ -2136,4 +2139,65 @@ mod tests { let execution_props = ExecutionProps::new(); create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap() } + + + #[tokio::test] + async fn test_struct_filter_parquet() -> Result<()> { + let tmp_dir = TempDir::new()?; + let path =tmp_dir.path().to_str().unwrap().to_string()+"/test.parquet"; + write_file(&path); + let ctx = SessionContext::new(); + let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); + ctx.register_listing_table("base_table", path, opt, None, None) + .await + .unwrap(); + let sql = "select * from base_table where name='test02'"; + let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap(); + assert_eq!(batch.len(),1); + let expected = ["+---------------------+----+--------+", + "| struct | id | name |", + "+---------------------+----+--------+", + "| {id: 4, name: aaa2} | 2 | test02 |", + "+---------------------+----+--------+"]; + crate::assert_batches_eq!(expected, &batch); + Ok(()) + } + + fn write_file(file: &String) { + let struct_fields = Fields::from(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + ]); + let schema = Schema::new(vec![ + Field::new("struct", DataType::Struct(struct_fields.clone()), false), + Field::new("id", DataType::Int64, true), + Field::new("name", DataType::Utf8, false), + ]); + let id_array = Int64Array::from(vec![Some(1), Some(2)]); + let columns = vec![ + Arc::new(Int64Array::from(vec![3, 4])) as _, + Arc::new(StringArray::from(vec!["aaa1", "aaa2"])) as _, + ]; + let struct_array = StructArray::new(struct_fields, columns, None); + + let name_array = StringArray::from(vec![Some("test01"), Some("test02")]); + let schema = Arc::new(schema); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(struct_array), + Arc::new(id_array), + Arc::new(name_array), + ], + ) + .unwrap(); + let file = File::create(file).unwrap(); + let w_opt = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(file, schema, Some(w_opt)).unwrap(); + writer.write(&batch).unwrap(); + writer.flush().unwrap(); + writer.close().unwrap(); + + } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index a0637f379610..190e733b42c3 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -280,7 +280,7 @@ fn find_column_index( .columns() .iter() .enumerate() - .find(|(_idx, c)| c.column_descr().name() == column.name()) + .find(|(_idx, c)| c.column_descr().name() == column.name() && c.column_descr().path().parts().len() == 0) .map(|(idx, _c)| idx); if col_idx.is_none() { From 74e44e06b5cef4218000e74f9f54ebb4a69313ab Mon Sep 17 00:00:00 2001 From: manoj-inukolunu Date: Sat, 20 Jan 2024 23:00:17 +0530 Subject: [PATCH 2/7] use parquet_column instead of find_column_index. --- .../datasource/physical_plan/parquet/mod.rs | 2 +- .../physical_plan/parquet/page_filter.rs | 36 +++++++++---------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index b42107384d97..90bf7df8d7b2 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -550,7 +550,7 @@ impl FileOpener for ParquetOpener { if enable_page_index && !row_groups.is_empty() { if let Some(p) = page_pruning_predicate { let pruned = - p.prune(&row_groups, file_metadata.as_ref(), &file_metrics)?; + p.prune(&file_schema, builder.parquet_schema(), &row_groups, file_metadata.as_ref(), &file_metrics)?; if let Some(row_selection) = pruned { builder = builder.with_row_selection(row_selection); } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index 190e733b42c3..2c16473fa4bf 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -27,7 +27,7 @@ use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; use log::{debug, trace}; -use parquet::schema::types::ColumnDescriptor; +use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; use parquet::{ arrow::arrow_reader::{RowSelection, RowSelector}, errors::ParquetError, @@ -39,9 +39,10 @@ use parquet::{ }; use std::collections::HashSet; use std::sync::Arc; +use arrow_schema::Schema; use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type; -use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128; +use crate::datasource::physical_plan::parquet::statistics::{from_bytes_to_i128, parquet_column}; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::metrics::ParquetFileMetrics; @@ -128,6 +129,8 @@ impl PagePruningPredicate { /// Returns a [`RowSelection`] for the given file pub fn prune( &self, + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, row_groups: &[usize], file_metadata: &ParquetMetaData, file_metrics: &ParquetFileMetrics, @@ -164,17 +167,22 @@ impl PagePruningPredicate { let mut row_selections = Vec::with_capacity(page_index_predicates.len()); for predicate in page_index_predicates { // find column index by looking in the row group metadata. - let col_idx = find_column_index(predicate, &groups[0]); + let name = find_column_name(predicate); + let mut parquet_col = None; + if name.is_some(){ + parquet_col = parquet_column(parquet_schema, arrow_schema, name.unwrap().as_str()); + } let mut selectors = Vec::with_capacity(row_groups.len()); for r in row_groups.iter() { let row_group_metadata = &groups[*r]; let rg_offset_indexes = file_offset_indexes.get(*r); let rg_page_indexes = file_page_indexes.get(*r); - if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) = - (rg_page_indexes, rg_offset_indexes, col_idx) + if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_ref)) = + (rg_page_indexes, rg_offset_indexes, parquet_col) { + let col_idx = col_ref.0; selectors.extend( prune_pages_in_one_row_group( row_group_metadata, @@ -249,10 +257,9 @@ impl PagePruningPredicate { /// that `extract_page_index_push_down_predicates` only return /// predicate with one col) /// -fn find_column_index( +fn find_column_name( predicate: &PruningPredicate, - row_group_metadata: &RowGroupMetaData, -) -> Option { +) -> Option { let mut found_required_column: Option<&Column> = None; for required_column_details in predicate.required_columns().iter() { @@ -276,18 +283,7 @@ fn find_column_index( return None; }; - let col_idx = row_group_metadata - .columns() - .iter() - .enumerate() - .find(|(_idx, c)| c.column_descr().name() == column.name() && c.column_descr().path().parts().len() == 0) - .map(|(idx, _c)| idx); - - if col_idx.is_none() { - trace!("Can not find column {} in row group meta", column.name()); - } - - col_idx + Some(column.name().to_string()) } /// Intersects the [`RowSelector`]s From 84f8550c6350c4e1d027c103c8bef76aa438ccd7 Mon Sep 17 00:00:00 2001 From: manoj-inukolunu Date: Sat, 20 Jan 2024 23:02:12 +0530 Subject: [PATCH 3/7] Remove unused struct --- datafusion/core/src/datasource/physical_plan/parquet/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 90bf7df8d7b2..72190912c582 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -782,7 +782,7 @@ mod tests { array::{Int64Array, Int8Array, StringArray}, datatypes::{DataType, Field, SchemaBuilder}, }; - use arrow_array::{Date64Array, GenericStringArray, StructArray}; + use arrow_array::{Date64Array, StructArray}; use chrono::{TimeZone, Utc}; use datafusion_common::{assert_contains, ToDFSchema}; use datafusion_common::{FileType, GetExt, ScalarValue}; From 9b378d1d1eaa15f6b2d748e280dbe280fde6fe67 Mon Sep 17 00:00:00 2001 From: manoj-inukolunu Date: Sun, 21 Jan 2024 21:27:30 +0530 Subject: [PATCH 4/7] Fix formatting issues. --- .../datasource/physical_plan/parquet/mod.rs | 37 +++++++++++-------- .../physical_plan/parquet/page_filter.rs | 15 ++++---- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index e0ad1f612721..aed7a9be90f9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -549,8 +549,13 @@ impl FileOpener for ParquetOpener { // with that range can be skipped as well if enable_page_index && !row_groups.is_empty() { if let Some(p) = page_pruning_predicate { - let pruned = - p.prune(&file_schema, builder.parquet_schema(), &row_groups, file_metadata.as_ref(), &file_metrics)?; + let pruned = p.prune( + &file_schema, + builder.parquet_schema(), + &row_groups, + file_metadata.as_ref(), + &file_metrics, + )?; if let Some(row_selection) = pruned { builder = builder.with_row_selection(row_selection); } @@ -782,7 +787,9 @@ mod tests { array::{Int64Array, Int8Array, StringArray}, datatypes::{DataType, Field, SchemaBuilder}, }; + use arrow_array::cast::AsArray; use arrow_array::{Date64Array, StructArray}; + use arrow_schema::Fields; use chrono::{TimeZone, Utc}; use datafusion_common::{assert_contains, ToDFSchema}; use datafusion_common::{FileType, GetExt, ScalarValue}; @@ -793,11 +800,9 @@ mod tests { use object_store::local::LocalFileSystem; use object_store::path::Path; use object_store::ObjectMeta; + use parquet::arrow::ArrowWriter; use std::fs::{self, File}; use std::io::Write; - use arrow_array::cast::AsArray; - use arrow_schema::Fields; - use parquet::arrow::ArrowWriter; use tempfile::TempDir; use url::Url; @@ -1768,7 +1773,7 @@ mod tests { // assert the batches and some metrics #[rustfmt::skip] - let expected = ["+-----+", + let expected = ["+-----+", "| int |", "+-----+", "| 4 |", @@ -2140,11 +2145,10 @@ mod tests { create_physical_expr(expr, &df_schema, &execution_props).unwrap() } - #[tokio::test] async fn test_struct_filter_parquet() -> Result<()> { let tmp_dir = TempDir::new()?; - let path =tmp_dir.path().to_str().unwrap().to_string()+"/test.parquet"; + let path = tmp_dir.path().to_str().unwrap().to_string() + "/test.parquet"; write_file(&path); let ctx = SessionContext::new(); let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); @@ -2153,12 +2157,14 @@ mod tests { .unwrap(); let sql = "select * from base_table where name='test02'"; let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap(); - assert_eq!(batch.len(),1); - let expected = ["+---------------------+----+--------+", - "| struct | id | name |", - "+---------------------+----+--------+", - "| {id: 4, name: aaa2} | 2 | test02 |", - "+---------------------+----+--------+"]; + assert_eq!(batch.len(), 1); + let expected = [ + "+---------------------+----+--------+", + "| struct | id | name |", + "+---------------------+----+--------+", + "| {id: 4, name: aaa2} | 2 | test02 |", + "+---------------------+----+--------+", + ]; crate::assert_batches_eq!(expected, &batch); Ok(()) } @@ -2191,13 +2197,12 @@ mod tests { Arc::new(name_array), ], ) - .unwrap(); + .unwrap(); let file = File::create(file).unwrap(); let w_opt = WriterProperties::builder().build(); let mut writer = ArrowWriter::try_new(file, schema, Some(w_opt)).unwrap(); writer.write(&batch).unwrap(); writer.flush().unwrap(); writer.close().unwrap(); - } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index 2c16473fa4bf..761c3446a055 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -23,6 +23,7 @@ use arrow::array::{ }; use arrow::datatypes::DataType; use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError}; +use arrow_schema::Schema; use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; @@ -39,10 +40,11 @@ use parquet::{ }; use std::collections::HashSet; use std::sync::Arc; -use arrow_schema::Schema; use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type; -use crate::datasource::physical_plan::parquet::statistics::{from_bytes_to_i128, parquet_column}; +use crate::datasource::physical_plan::parquet::statistics::{ + from_bytes_to_i128, parquet_column, +}; use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::metrics::ParquetFileMetrics; @@ -170,8 +172,9 @@ impl PagePruningPredicate { let name = find_column_name(predicate); let mut parquet_col = None; - if name.is_some(){ - parquet_col = parquet_column(parquet_schema, arrow_schema, name.unwrap().as_str()); + if name.is_some() { + parquet_col = + parquet_column(parquet_schema, arrow_schema, name.unwrap().as_str()); } let mut selectors = Vec::with_capacity(row_groups.len()); for r in row_groups.iter() { @@ -257,9 +260,7 @@ impl PagePruningPredicate { /// that `extract_page_index_push_down_predicates` only return /// predicate with one col) /// -fn find_column_name( - predicate: &PruningPredicate, -) -> Option { +fn find_column_name(predicate: &PruningPredicate) -> Option { let mut found_required_column: Option<&Column> = None; for required_column_details in predicate.required_columns().iter() { From f373ba91f91f244521eb68b0b0fca0fe8f2547b9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 22 Jan 2024 15:18:32 -0500 Subject: [PATCH 5/7] Simplify struct field resolution --- .../datasource/physical_plan/parquet/mod.rs | 3 +- .../physical_plan/parquet/page_filter.rs | 33 ++++++++----------- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index aed7a9be90f9..e3bfa71a81e3 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -787,8 +787,7 @@ mod tests { array::{Int64Array, Int8Array, StringArray}, datatypes::{DataType, Field, SchemaBuilder}, }; - use arrow_array::cast::AsArray; - use arrow_array::{Date64Array, StructArray}; + use arrow_array::{Date64Array, StructArray}; use arrow_schema::Fields; use chrono::{TimeZone, Utc}; use datafusion_common::{assert_contains, ToDFSchema}; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index 761c3446a055..d75f495bfdff 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -168,24 +168,17 @@ impl PagePruningPredicate { let mut row_selections = Vec::with_capacity(page_index_predicates.len()); for predicate in page_index_predicates { - // find column index by looking in the row group metadata. - - let name = find_column_name(predicate); - let mut parquet_col = None; - if name.is_some() { - parquet_col = - parquet_column(parquet_schema, arrow_schema, name.unwrap().as_str()); - } + // find column index in the parquet schema + let col_idx = find_column_index(predicate, arrow_schema, parquet_schema); let mut selectors = Vec::with_capacity(row_groups.len()); for r in row_groups.iter() { let row_group_metadata = &groups[*r]; let rg_offset_indexes = file_offset_indexes.get(*r); let rg_page_indexes = file_page_indexes.get(*r); - if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_ref)) = - (rg_page_indexes, rg_offset_indexes, parquet_col) + if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) = + (rg_page_indexes, rg_offset_indexes, col_idx) { - let col_idx = col_ref.0; selectors.extend( prune_pages_in_one_row_group( row_group_metadata, @@ -242,7 +235,7 @@ impl PagePruningPredicate { } } -/// Returns the column index in the row group metadata for the single +/// Returns the column index in the row parquet schema for the single /// column of a single column pruning predicate. /// /// For example, give the predicate `y > 5` @@ -257,10 +250,13 @@ impl PagePruningPredicate { /// Panics: /// /// If the predicate contains more than one column reference (assumes -/// that `extract_page_index_push_down_predicates` only return +/// that `extract_page_index_push_down_predicates` only returns /// predicate with one col) -/// -fn find_column_name(predicate: &PruningPredicate) -> Option { +fn find_column_index( + predicate: &PruningPredicate, + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor +) -> Option { let mut found_required_column: Option<&Column> = None; for required_column_details in predicate.required_columns().iter() { @@ -277,14 +273,13 @@ fn find_column_name(predicate: &PruningPredicate) -> Option { } } - let column = if let Some(found_required_column) = found_required_column.as_ref() { - found_required_column - } else { + let Some(column) = found_required_column.as_ref() else { trace!("No column references in pruning predicate"); return None; }; - Some(column.name().to_string()) + parquet_column(parquet_schema, arrow_schema, column.name()) + .map(|x| x.0) } /// Intersects the [`RowSelector`]s From ea482a5a03208e8fa8dcd5373fcde0312f2c0487 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 22 Jan 2024 15:19:55 -0500 Subject: [PATCH 6/7] fix formatting --- datafusion/core/src/datasource/physical_plan/parquet/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index e3bfa71a81e3..1852a11c9fb9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -1772,12 +1772,14 @@ mod tests { // assert the batches and some metrics #[rustfmt::skip] - let expected = ["+-----+", + let expected = [ + "+-----+", "| int |", "+-----+", "| 4 |", "| 5 |", - "+-----+"]; + "+-----+" + ]; assert_batches_sorted_eq!(expected, &rt.batches.unwrap()); assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 4); assert!( From 6a9d76b99b45b70b8584763969212e82e4e7c746 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 24 Jan 2024 06:44:46 -0500 Subject: [PATCH 7/7] fmt --- datafusion/core/src/datasource/physical_plan/parquet/mod.rs | 2 +- .../core/src/datasource/physical_plan/parquet/page_filter.rs | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 1852a11c9fb9..7215cdd60716 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -787,7 +787,7 @@ mod tests { array::{Int64Array, Int8Array, StringArray}, datatypes::{DataType, Field, SchemaBuilder}, }; - use arrow_array::{Date64Array, StructArray}; + use arrow_array::{Date64Array, StructArray}; use arrow_schema::Fields; use chrono::{TimeZone, Utc}; use datafusion_common::{assert_contains, ToDFSchema}; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index d75f495bfdff..f0a8e6608990 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -255,7 +255,7 @@ impl PagePruningPredicate { fn find_column_index( predicate: &PruningPredicate, arrow_schema: &Schema, - parquet_schema: &SchemaDescriptor + parquet_schema: &SchemaDescriptor, ) -> Option { let mut found_required_column: Option<&Column> = None; @@ -278,8 +278,7 @@ fn find_column_index( return None; }; - parquet_column(parquet_schema, arrow_schema, column.name()) - .map(|x| x.0) + parquet_column(parquet_schema, arrow_schema, column.name()).map(|x| x.0) } /// Intersects the [`RowSelector`]s