From 41a61f0cabc9a4395bda501960424edf039ef982 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 23 Jan 2024 11:42:17 +0800 Subject: [PATCH] revert data gen and add check --- .../physical_plan/parquet/row_groups.rs | 14 +- datafusion/core/src/test_util/parquet.rs | 3 - .../core/tests/parquet/filter_pushdown.rs | 131 ------------------ datafusion/core/tests/parquet/mod.rs | 28 ++++ .../core/tests/parquet/row_group_pruning.rs | 33 +++++ parquet-testing | 2 +- test-utils/src/data_gen.rs | 52 +------ testing | 2 +- 8 files changed, 80 insertions(+), 185 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index e3b2529ad869..3a6bf1723096 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -19,6 +19,7 @@ use arrow::{array::ArrayRef, datatypes::Schema}; use arrow_array::BooleanArray; use arrow_schema::FieldRef; use datafusion_common::{Column, ScalarValue}; +use num_traits::ToBytes; use parquet::basic::Type; use parquet::data_type::Decimal; use parquet::file::metadata::ColumnChunkMetaData; @@ -228,6 +229,13 @@ impl PruningStatistics for BloomFilterStatistics { Type::INT32 => { //https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/Encodings.md?plain=1#L35-L42 // All physical type are little-endian + if *p > 9 { + //DECIMAL can be used to annotate the following types: + // + // int32: for 1 <= precision <= 9 + // int64: for 1 <= precision <= 18 + return true; + } let b = (*v as i32).to_le_bytes(); let decimal = Decimal::Int32 { value: b, @@ -237,6 +245,9 @@ impl PruningStatistics for BloomFilterStatistics { sbbf.check(&decimal) } Type::INT64 => { + if *p > 18 { + return true; + } let b = (*v as i64).to_le_bytes(); let decimal = Decimal::Int64 { value: b, @@ -246,7 +257,8 @@ impl PruningStatistics for BloomFilterStatistics { sbbf.check(&decimal) } Type::FIXED_LEN_BYTE_ARRAY => { - let b = v.to_le_bytes().to_vec(); + // keep with from_bytes_to_i128 + let b = v.to_be_bytes().to_vec(); let decimal = Decimal::Bytes { value: b.into(), precision: *p as i32, diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 8ccbc0270d1c..1047c3dd4e48 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -60,8 +60,6 @@ pub struct ParquetScanOptions { pub reorder_filters: bool, /// enable page index pub enable_page_index: bool, - /// enable bloom filter - pub enable_bloom_filter: bool, } impl ParquetScanOptions { @@ -71,7 +69,6 @@ impl ParquetScanOptions { config.execution.parquet.pushdown_filters = self.pushdown_filters; config.execution.parquet.reorder_filters = self.reorder_filters; config.execution.parquet.enable_page_index = self.enable_page_index; - config.execution.parquet.bloom_filter_enabled = self.enable_bloom_filter; config.into() } } diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index 02da85647820..f214e8903a4f 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -34,7 +34,6 @@ use datafusion::physical_plan::collect; use datafusion::physical_plan::metrics::MetricsSet; use datafusion::prelude::{col, lit, lit_timestamp_nano, Expr, SessionContext}; use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile}; -use datafusion_common::ScalarValue; use datafusion_expr::utils::{conjunction, disjunction, split_conjunction}; use itertools::Itertools; use parquet::file::properties::WriterProperties; @@ -339,59 +338,6 @@ async fn single_file_small_data_pages() { .await; } -#[cfg(not(target_family = "windows"))] -#[tokio::test] -async fn single_file_with_bloom_filter() { - let tempdir = TempDir::new().unwrap(); - - let props = WriterProperties::builder() - .set_bloom_filter_enabled(true) - .build(); - let test_parquet_file = generate_file(&tempdir, props); - - TestCase::new(&test_parquet_file) - .with_name("bloom_filter_on_decimal128_int32") - // predicate is chosen carefully to prune row groups id from 0 to 99 except 50 - // decimal128_id_int32 = 50 - .with_filter( - col("decimal128_id_int32") - .in_list(vec![lit(ScalarValue::Decimal128(Some(50), 8, 0))], false), - ) - .with_bloom_filter_filtering_expected(BloomFilterFilteringExpected::Some) - .with_pushdown_expected(PushdownExpected::None) - .with_expected_rows(0) - .run() - .await; - - TestCase::new(&test_parquet_file) - .with_name("bloom_filter_on_decimal128_int64") - // predicate is chosen carefully to prune row groups id from 0 to 99 except 50 - // decimal128_id_int64 = 50 - .with_filter( - col("decimal128_id_int64") - .in_list(vec![lit(ScalarValue::Decimal128(Some(50), 12, 0))], false), - ) - .with_bloom_filter_filtering_expected(BloomFilterFilteringExpected::Some) - .with_pushdown_expected(PushdownExpected::None) - .with_expected_rows(0) - .run() - .await; - - TestCase::new(&test_parquet_file) - .with_name("bloom_filter_on_decimal128_fixed_len_byte_array") - // predicate is chosen carefully to prune row groups id from 0 to 99 except 50 - // decimal128_id_fixed_len_byte = 50 - .with_filter( - col("decimal128_id_fixed_len_byte") - .in_list(vec![lit(ScalarValue::Decimal128(Some(50), 38, 0))], false), - ) - .with_bloom_filter_filtering_expected(BloomFilterFilteringExpected::Some) - .with_pushdown_expected(PushdownExpected::None) - .with_expected_rows(0) - .run() - .await; -} - /// Expected pushdown behavior #[derive(Debug, Clone, Copy)] enum PushdownExpected { @@ -410,14 +356,6 @@ enum PageIndexFilteringExpected { Some, } -#[derive(Debug, Clone, Copy)] -enum BloomFilterFilteringExpected { - /// Did not expect filter pushdown to filter any rows - None, - /// Expected that more than 0 were pruned - Some, -} - /// parameters for running a test struct TestCase<'a> { test_parquet_file: &'a TestParquetFile, @@ -431,9 +369,6 @@ struct TestCase<'a> { /// Did we expect page filtering to filter out pages page_index_filtering_expected: PageIndexFilteringExpected, - /// Did we expect bloom filter filtering to filter out row - bloom_filter_filtering_expected: BloomFilterFilteringExpected, - /// How many rows are expected to pass the predicate overall? expected_rows: usize, } @@ -447,7 +382,6 @@ impl<'a> TestCase<'a> { filter: lit(true), pushdown_expected: PushdownExpected::None, page_index_filtering_expected: PageIndexFilteringExpected::None, - bloom_filter_filtering_expected: BloomFilterFilteringExpected::None, expected_rows: 0, } } @@ -478,15 +412,6 @@ impl<'a> TestCase<'a> { self } - /// Set the expected bloom filyer filtering - fn with_bloom_filter_filtering_expected( - mut self, - v: BloomFilterFilteringExpected, - ) -> Self { - self.bloom_filter_filtering_expected = v; - self - } - /// Set the number of expected rows (to ensure the predicates have /// a good range of selectivity fn with_expected_rows(mut self, expected_rows: usize) -> Self { @@ -517,7 +442,6 @@ impl<'a> TestCase<'a> { pushdown_filters: false, reorder_filters: false, enable_page_index: false, - enable_bloom_filter: false, }, filter, ) @@ -529,7 +453,6 @@ impl<'a> TestCase<'a> { pushdown_filters: true, reorder_filters: false, enable_page_index: false, - enable_bloom_filter: false, }, filter, ) @@ -543,7 +466,6 @@ impl<'a> TestCase<'a> { pushdown_filters: true, reorder_filters: true, enable_page_index: false, - enable_bloom_filter: false, }, filter, ) @@ -557,7 +479,6 @@ impl<'a> TestCase<'a> { pushdown_filters: false, reorder_filters: false, enable_page_index: true, - enable_bloom_filter: false, }, filter, ) @@ -570,40 +491,12 @@ impl<'a> TestCase<'a> { pushdown_filters: true, reorder_filters: true, enable_page_index: true, - enable_bloom_filter: false, }, filter, ) .await; assert_eq!(no_pushdown, pushdown_reordering_and_page_index); - - let bloom_filter_only = self - .read_with_options( - ParquetScanOptions { - pushdown_filters: false, - reorder_filters: false, - enable_page_index: false, - enable_bloom_filter: true, - }, - filter, - ) - .await; - assert_eq!(no_pushdown, bloom_filter_only); - - let pushdown_reordering_and_bloom_filter = self - .read_with_options( - ParquetScanOptions { - pushdown_filters: false, - reorder_filters: true, - enable_page_index: false, - enable_bloom_filter: true, - }, - filter, - ) - .await; - - assert_eq!(no_pushdown, pushdown_reordering_and_bloom_filter); } /// Reads data from a test parquet file using the specified scan options @@ -686,30 +579,6 @@ impl<'a> TestCase<'a> { } }; - let row_groups_pruned_bloom_filter = - get_value(&metrics, "row_groups_pruned_bloom_filter"); - println!(" row_groups_pruned_bloom_filter: {row_groups_pruned_bloom_filter}"); - - let bloom_filter_filtering_expected = if scan_options.enable_bloom_filter { - self.bloom_filter_filtering_expected - } else { - // if bloom filter filtering is not enabled, don't expect it - // to filter rows - BloomFilterFilteringExpected::None - }; - - match bloom_filter_filtering_expected { - BloomFilterFilteringExpected::None => { - assert_eq!(row_groups_pruned_bloom_filter, 0); - } - BloomFilterFilteringExpected::Some => { - assert!( - row_groups_pruned_bloom_filter > 0, - "Expected to filter rows via bloom filter but none were", - ); - } - }; - batch } } diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 0602b4d4c525..7d879cebe1e1 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -66,7 +66,10 @@ enum Scenario { Int32Range, Float64, Decimal, + DecimalBloomFilterInt32, + DecimalBloomFilterInt64, DecimalLargePrecision, + DecimalLargePrecisionBloomFilter, PeriodsInColumnNames, } @@ -549,6 +552,22 @@ fn create_data_batch(scenario: Scenario) -> Vec { make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 9, 2), ] } + Scenario::DecimalBloomFilterInt32 => { + // decimal record batch + vec![ + make_decimal_batch(vec![100, 200, 300, 400, 500], 6, 2), + make_decimal_batch(vec![100, 200, 300, 400, 600], 6, 2), + make_decimal_batch(vec![100, 200, 300, 400, 600], 6, 2), + ] + } + Scenario::DecimalBloomFilterInt64 => { + // decimal record batch + vec![ + make_decimal_batch(vec![100, 200, 300, 400, 500], 9, 2), + make_decimal_batch(vec![100, 200, 300, 400, 600], 9, 2), + make_decimal_batch(vec![100, 200, 300, 400, 600], 9, 2), + ] + } Scenario::DecimalLargePrecision => { // decimal record batch with large precision, // and the data will stored as FIXED_LENGTH_BYTE_ARRAY @@ -558,6 +577,15 @@ fn create_data_batch(scenario: Scenario) -> Vec { make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 38, 2), ] } + Scenario::DecimalLargePrecisionBloomFilter => { + // decimal record batch with large precision, + // and the data will stored as FIXED_LENGTH_BYTE_ARRAY + vec![ + make_decimal_batch(vec![100, 200, 300, 400, 500], 38, 2), + make_decimal_batch(vec![100, 200, 300, 400, 600], 38, 2), + make_decimal_batch(vec![100, 200, 300, 400, 600], 38, 2), + ] + } Scenario::PeriodsInColumnNames => { vec![ // all frontend diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index fc1b66efed87..94aa92c178cc 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -578,6 +578,39 @@ async fn prune_decimal_in_list() { 6, ) .await; + + // test data -> r1: {1,2,3,4,5}, r2: {1,2,3,4,6}, r3: {1,2,3,4,6} + test_row_group_prune( + Scenario::DecimalBloomFilterInt32, + "SELECT * FROM t where decimal_col in (5)", + Some(0), + Some(0), + Some(2), + 1, + ) + .await; + + // test data -> r1: {1,2,3,4,5}, r2: {1,2,3,4,6}, r3: {1,2,3,4,6} + test_row_group_prune( + Scenario::DecimalBloomFilterInt64, + "SELECT * FROM t where decimal_col in (5)", + Some(0), + Some(0), + Some(2), + 1, + ) + .await; + + // test data -> r1: {1,2,3,4,5}, r2: {1,2,3,4,6}, r3: {1,2,3,4,6} + test_row_group_prune( + Scenario::DecimalLargePrecisionBloomFilter, + "SELECT * FROM t where decimal_col in (5)", + Some(0), + Some(0), + Some(2), + 1, + ) + .await; } #[tokio::test] diff --git a/parquet-testing b/parquet-testing index e45cd23f784a..d79a0101d90d 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit e45cd23f784aab3d6bf0701f8f4e621469ed3be7 +Subproject commit d79a0101d90dfa3bbb10337626f57a3e8c4b5363 diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs index 61788774435c..f5ed8510a79e 100644 --- a/test-utils/src/data_gen.rs +++ b/test-utils/src/data_gen.rs @@ -62,10 +62,7 @@ struct BatchBuilder { request_bytes: Int32Builder, response_bytes: Int32Builder, response_status: UInt16Builder, - decimal128_price: Decimal128Builder, - decimal128_id_int32: Decimal128Builder, - decimal128_id_int64: Decimal128Builder, - decimal128_id_fixed_len_byte: Decimal128Builder, + prices_status: Decimal128Builder, options: GeneratorOptions, row_count: usize, @@ -95,14 +92,7 @@ impl BatchBuilder { Field::new("request_bytes", DataType::Int32, true), Field::new("response_bytes", DataType::Int32, true), Field::new("response_status", DataType::UInt16, false), - Field::new("decimal128_price", DataType::Decimal128(38, 0), false), - Field::new("decimal128_id_int32", DataType::Decimal128(8, 0), false), - Field::new("decimal128_id_int64", DataType::Decimal128(12, 0), false), - Field::new( - "decimal128_id_fixed_len_byte", - DataType::Decimal128(38, 0), - false, - ), + Field::new("decimal_price", DataType::Decimal128(38, 0), false), ])) } @@ -179,19 +169,7 @@ impl BatchBuilder { .append_option(rng.gen_bool(0.9).then(|| rng.gen())); self.response_status .append_value(status[rng.gen_range(0..status.len())]); - self.decimal128_price.append_value(self.row_count as i128); - - let val = rng.gen_range(0..100); - // Avoid row group min_max to prune this, so skip 50 between 0..99 - if val == 50 { - self.decimal128_id_int32.append_value(1); - self.decimal128_id_int64.append_value(1); - self.decimal128_id_fixed_len_byte.append_value(1); - } else { - self.decimal128_id_int32.append_value(val); - self.decimal128_id_int64.append_value(val); - self.decimal128_id_fixed_len_byte.append_value(val); - } + self.prices_status.append_value(self.row_count as i128); } fn finish(mut self, schema: SchemaRef) -> RecordBatch { @@ -213,25 +191,7 @@ impl BatchBuilder { Arc::new(self.response_bytes.finish()), Arc::new(self.response_status.finish()), Arc::new( - self.decimal128_price - .finish() - .with_precision_and_scale(38, 0) - .unwrap(), - ), - Arc::new( - self.decimal128_id_int32 - .finish() - .with_precision_and_scale(8, 0) - .unwrap(), - ), - Arc::new( - self.decimal128_id_int64 - .finish() - .with_precision_and_scale(12, 0) - .unwrap(), - ), - Arc::new( - self.decimal128_id_fixed_len_byte + self.prices_status .finish() .with_precision_and_scale(38, 0) .unwrap(), @@ -285,10 +245,6 @@ fn generate_sorted_strings( /// request_bytes: -312099516 /// response_bytes: 1448834362 /// response_status: 200 -/// decimal128_price: 1000, -/// decimal128_id_int32: 5, -/// decimal128_id_int64: 5, -/// decimal128_id_fixed_len_byte: 5, /// ``` #[derive(Debug)] pub struct AccessLogGenerator { diff --git a/testing b/testing index 98fceecd024d..e81d0c6de359 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 98fceecd024dccd2f8a00e32fc144975f218acf4 +Subproject commit e81d0c6de35948b3be7984af8e00413b314cde6e