Skip to content

Commit

Permalink
revert data gen and add check
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang committed Jan 23, 2024
1 parent 9527792 commit 41a61f0
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arrow::{array::ArrayRef, datatypes::Schema};
use arrow_array::BooleanArray;
use arrow_schema::FieldRef;
use datafusion_common::{Column, ScalarValue};
use num_traits::ToBytes;
use parquet::basic::Type;
use parquet::data_type::Decimal;
use parquet::file::metadata::ColumnChunkMetaData;
Expand Down Expand Up @@ -228,6 +229,13 @@ impl PruningStatistics for BloomFilterStatistics {
Type::INT32 => {
//https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/Encodings.md?plain=1#L35-L42
// All physical type are little-endian
if *p > 9 {
//DECIMAL can be used to annotate the following types:
//
// int32: for 1 <= precision <= 9
// int64: for 1 <= precision <= 18
return true;
}
let b = (*v as i32).to_le_bytes();
let decimal = Decimal::Int32 {
value: b,
Expand All @@ -237,6 +245,9 @@ impl PruningStatistics for BloomFilterStatistics {
sbbf.check(&decimal)
}
Type::INT64 => {
if *p > 18 {
return true;
}
let b = (*v as i64).to_le_bytes();
let decimal = Decimal::Int64 {
value: b,
Expand All @@ -246,7 +257,8 @@ impl PruningStatistics for BloomFilterStatistics {
sbbf.check(&decimal)
}
Type::FIXED_LEN_BYTE_ARRAY => {
let b = v.to_le_bytes().to_vec();
// keep with from_bytes_to_i128
let b = v.to_be_bytes().to_vec();
let decimal = Decimal::Bytes {
value: b.into(),
precision: *p as i32,
Expand Down
3 changes: 0 additions & 3 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ pub struct ParquetScanOptions {
pub reorder_filters: bool,
/// enable page index
pub enable_page_index: bool,
/// enable bloom filter
pub enable_bloom_filter: bool,
}

impl ParquetScanOptions {
Expand All @@ -71,7 +69,6 @@ impl ParquetScanOptions {
config.execution.parquet.pushdown_filters = self.pushdown_filters;
config.execution.parquet.reorder_filters = self.reorder_filters;
config.execution.parquet.enable_page_index = self.enable_page_index;
config.execution.parquet.bloom_filter_enabled = self.enable_bloom_filter;
config.into()
}
}
Expand Down
131 changes: 0 additions & 131 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use datafusion::physical_plan::collect;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::prelude::{col, lit, lit_timestamp_nano, Expr, SessionContext};
use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
use datafusion_common::ScalarValue;
use datafusion_expr::utils::{conjunction, disjunction, split_conjunction};
use itertools::Itertools;
use parquet::file::properties::WriterProperties;
Expand Down Expand Up @@ -339,59 +338,6 @@ async fn single_file_small_data_pages() {
.await;
}

#[cfg(not(target_family = "windows"))]
#[tokio::test]
async fn single_file_with_bloom_filter() {
let tempdir = TempDir::new().unwrap();

let props = WriterProperties::builder()
.set_bloom_filter_enabled(true)
.build();
let test_parquet_file = generate_file(&tempdir, props);

TestCase::new(&test_parquet_file)
.with_name("bloom_filter_on_decimal128_int32")
// predicate is chosen carefully to prune row groups id from 0 to 99 except 50
// decimal128_id_int32 = 50
.with_filter(
col("decimal128_id_int32")
.in_list(vec![lit(ScalarValue::Decimal128(Some(50), 8, 0))], false),
)
.with_bloom_filter_filtering_expected(BloomFilterFilteringExpected::Some)
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(0)
.run()
.await;

TestCase::new(&test_parquet_file)
.with_name("bloom_filter_on_decimal128_int64")
// predicate is chosen carefully to prune row groups id from 0 to 99 except 50
// decimal128_id_int64 = 50
.with_filter(
col("decimal128_id_int64")
.in_list(vec![lit(ScalarValue::Decimal128(Some(50), 12, 0))], false),
)
.with_bloom_filter_filtering_expected(BloomFilterFilteringExpected::Some)
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(0)
.run()
.await;

TestCase::new(&test_parquet_file)
.with_name("bloom_filter_on_decimal128_fixed_len_byte_array")
// predicate is chosen carefully to prune row groups id from 0 to 99 except 50
// decimal128_id_fixed_len_byte = 50
.with_filter(
col("decimal128_id_fixed_len_byte")
.in_list(vec![lit(ScalarValue::Decimal128(Some(50), 38, 0))], false),
)
.with_bloom_filter_filtering_expected(BloomFilterFilteringExpected::Some)
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(0)
.run()
.await;
}

/// Expected pushdown behavior
#[derive(Debug, Clone, Copy)]
enum PushdownExpected {
Expand All @@ -410,14 +356,6 @@ enum PageIndexFilteringExpected {
Some,
}

#[derive(Debug, Clone, Copy)]
enum BloomFilterFilteringExpected {
/// Did not expect filter pushdown to filter any rows
None,
/// Expected that more than 0 were pruned
Some,
}

/// parameters for running a test
struct TestCase<'a> {
test_parquet_file: &'a TestParquetFile,
Expand All @@ -431,9 +369,6 @@ struct TestCase<'a> {
/// Did we expect page filtering to filter out pages
page_index_filtering_expected: PageIndexFilteringExpected,

/// Did we expect bloom filter filtering to filter out row
bloom_filter_filtering_expected: BloomFilterFilteringExpected,

/// How many rows are expected to pass the predicate overall?
expected_rows: usize,
}
Expand All @@ -447,7 +382,6 @@ impl<'a> TestCase<'a> {
filter: lit(true),
pushdown_expected: PushdownExpected::None,
page_index_filtering_expected: PageIndexFilteringExpected::None,
bloom_filter_filtering_expected: BloomFilterFilteringExpected::None,
expected_rows: 0,
}
}
Expand Down Expand Up @@ -478,15 +412,6 @@ impl<'a> TestCase<'a> {
self
}

/// Set the expected bloom filyer filtering
fn with_bloom_filter_filtering_expected(
mut self,
v: BloomFilterFilteringExpected,
) -> Self {
self.bloom_filter_filtering_expected = v;
self
}

/// Set the number of expected rows (to ensure the predicates have
/// a good range of selectivity
fn with_expected_rows(mut self, expected_rows: usize) -> Self {
Expand Down Expand Up @@ -517,7 +442,6 @@ impl<'a> TestCase<'a> {
pushdown_filters: false,
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
},
filter,
)
Expand All @@ -529,7 +453,6 @@ impl<'a> TestCase<'a> {
pushdown_filters: true,
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
},
filter,
)
Expand All @@ -543,7 +466,6 @@ impl<'a> TestCase<'a> {
pushdown_filters: true,
reorder_filters: true,
enable_page_index: false,
enable_bloom_filter: false,
},
filter,
)
Expand All @@ -557,7 +479,6 @@ impl<'a> TestCase<'a> {
pushdown_filters: false,
reorder_filters: false,
enable_page_index: true,
enable_bloom_filter: false,
},
filter,
)
Expand All @@ -570,40 +491,12 @@ impl<'a> TestCase<'a> {
pushdown_filters: true,
reorder_filters: true,
enable_page_index: true,
enable_bloom_filter: false,
},
filter,
)
.await;

assert_eq!(no_pushdown, pushdown_reordering_and_page_index);

let bloom_filter_only = self
.read_with_options(
ParquetScanOptions {
pushdown_filters: false,
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: true,
},
filter,
)
.await;
assert_eq!(no_pushdown, bloom_filter_only);

let pushdown_reordering_and_bloom_filter = self
.read_with_options(
ParquetScanOptions {
pushdown_filters: false,
reorder_filters: true,
enable_page_index: false,
enable_bloom_filter: true,
},
filter,
)
.await;

assert_eq!(no_pushdown, pushdown_reordering_and_bloom_filter);
}

/// Reads data from a test parquet file using the specified scan options
Expand Down Expand Up @@ -686,30 +579,6 @@ impl<'a> TestCase<'a> {
}
};

let row_groups_pruned_bloom_filter =
get_value(&metrics, "row_groups_pruned_bloom_filter");
println!(" row_groups_pruned_bloom_filter: {row_groups_pruned_bloom_filter}");

let bloom_filter_filtering_expected = if scan_options.enable_bloom_filter {
self.bloom_filter_filtering_expected
} else {
// if bloom filter filtering is not enabled, don't expect it
// to filter rows
BloomFilterFilteringExpected::None
};

match bloom_filter_filtering_expected {
BloomFilterFilteringExpected::None => {
assert_eq!(row_groups_pruned_bloom_filter, 0);
}
BloomFilterFilteringExpected::Some => {
assert!(
row_groups_pruned_bloom_filter > 0,
"Expected to filter rows via bloom filter but none were",
);
}
};

batch
}
}
Expand Down
28 changes: 28 additions & 0 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ enum Scenario {
Int32Range,
Float64,
Decimal,
DecimalBloomFilterInt32,
DecimalBloomFilterInt64,
DecimalLargePrecision,
DecimalLargePrecisionBloomFilter,
PeriodsInColumnNames,
}

Expand Down Expand Up @@ -549,6 +552,22 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 9, 2),
]
}
Scenario::DecimalBloomFilterInt32 => {
// decimal record batch
vec![
make_decimal_batch(vec![100, 200, 300, 400, 500], 6, 2),
make_decimal_batch(vec![100, 200, 300, 400, 600], 6, 2),
make_decimal_batch(vec![100, 200, 300, 400, 600], 6, 2),
]
}
Scenario::DecimalBloomFilterInt64 => {
// decimal record batch
vec![
make_decimal_batch(vec![100, 200, 300, 400, 500], 9, 2),
make_decimal_batch(vec![100, 200, 300, 400, 600], 9, 2),
make_decimal_batch(vec![100, 200, 300, 400, 600], 9, 2),
]
}
Scenario::DecimalLargePrecision => {
// decimal record batch with large precision,
// and the data will stored as FIXED_LENGTH_BYTE_ARRAY
Expand All @@ -558,6 +577,15 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 38, 2),
]
}
Scenario::DecimalLargePrecisionBloomFilter => {
// decimal record batch with large precision,
// and the data will stored as FIXED_LENGTH_BYTE_ARRAY
vec![
make_decimal_batch(vec![100, 200, 300, 400, 500], 38, 2),
make_decimal_batch(vec![100, 200, 300, 400, 600], 38, 2),
make_decimal_batch(vec![100, 200, 300, 400, 600], 38, 2),
]
}
Scenario::PeriodsInColumnNames => {
vec![
// all frontend
Expand Down
33 changes: 33 additions & 0 deletions datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,39 @@ async fn prune_decimal_in_list() {
6,
)
.await;

// test data -> r1: {1,2,3,4,5}, r2: {1,2,3,4,6}, r3: {1,2,3,4,6}
test_row_group_prune(
Scenario::DecimalBloomFilterInt32,
"SELECT * FROM t where decimal_col in (5)",
Some(0),
Some(0),
Some(2),
1,
)
.await;

// test data -> r1: {1,2,3,4,5}, r2: {1,2,3,4,6}, r3: {1,2,3,4,6}
test_row_group_prune(
Scenario::DecimalBloomFilterInt64,
"SELECT * FROM t where decimal_col in (5)",
Some(0),
Some(0),
Some(2),
1,
)
.await;

// test data -> r1: {1,2,3,4,5}, r2: {1,2,3,4,6}, r3: {1,2,3,4,6}
test_row_group_prune(
Scenario::DecimalLargePrecisionBloomFilter,
"SELECT * FROM t where decimal_col in (5)",
Some(0),
Some(0),
Some(2),
1,
)
.await;
}

#[tokio::test]
Expand Down
Loading

0 comments on commit 41a61f0

Please sign in to comment.