Skip to content

Commit

Permalink
[Pruning] Support parquet bloom filter pruning for decimal128
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang committed Jan 20, 2024
1 parent 95e739c commit 9527792
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 18 deletions.
67 changes: 53 additions & 14 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use arrow::{array::ArrayRef, datatypes::Schema};
use arrow_array::BooleanArray;
use arrow_schema::FieldRef;
use datafusion_common::{Column, ScalarValue};
use parquet::basic::Type;
use parquet::data_type::Decimal;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::SchemaDescriptor;
use parquet::{
Expand Down Expand Up @@ -143,7 +145,10 @@ pub(crate) async fn prune_row_groups_by_bloom_filters<
continue;
}
};
column_sbbf.insert(column_name.to_string(), bf);
let physical_type =
builder.parquet_schema().column(column_idx).physical_type();

column_sbbf.insert(column_name.to_string(), (bf, physical_type));
}

let stats = BloomFilterStatistics { column_sbbf };
Expand All @@ -169,8 +174,8 @@ pub(crate) async fn prune_row_groups_by_bloom_filters<

/// Implements `PruningStatistics` for Parquet Split Block Bloom Filters (SBBF)
struct BloomFilterStatistics {
/// Maps column name to the parquet bloom filter
column_sbbf: HashMap<String, Sbbf>,
/// Maps column name to the parquet bloom filter and parquet physical type
column_sbbf: HashMap<String, (Sbbf, Type)>,
}

impl PruningStatistics for BloomFilterStatistics {
Expand Down Expand Up @@ -200,7 +205,7 @@ impl PruningStatistics for BloomFilterStatistics {
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
let sbbf = self.column_sbbf.get(column.name.as_str())?;
let (sbbf, parquet_type) = self.column_sbbf.get(column.name.as_str())?;

// Bloom filters are probabilistic data structures that can return false
// positives (i.e. it might return true even if the value is not
Expand All @@ -209,16 +214,50 @@ impl PruningStatistics for BloomFilterStatistics {

let known_not_present = values
.iter()
.map(|value| match value {
ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()),
ScalarValue::Boolean(Some(v)) => sbbf.check(v),
ScalarValue::Float64(Some(v)) => sbbf.check(v),
ScalarValue::Float32(Some(v)) => sbbf.check(v),
ScalarValue::Int64(Some(v)) => sbbf.check(v),
ScalarValue::Int32(Some(v)) => sbbf.check(v),
ScalarValue::Int16(Some(v)) => sbbf.check(v),
ScalarValue::Int8(Some(v)) => sbbf.check(v),
_ => true,
.map(|value| {
match value {
ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()),
ScalarValue::Boolean(Some(v)) => sbbf.check(v),
ScalarValue::Float64(Some(v)) => sbbf.check(v),
ScalarValue::Float32(Some(v)) => sbbf.check(v),
ScalarValue::Int64(Some(v)) => sbbf.check(v),
ScalarValue::Int32(Some(v)) => sbbf.check(v),
ScalarValue::Int16(Some(v)) => sbbf.check(v),
ScalarValue::Int8(Some(v)) => sbbf.check(v),
ScalarValue::Decimal128(Some(v), p, s) => match parquet_type {
Type::INT32 => {
//https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/Encodings.md?plain=1#L35-L42
// All physical type are little-endian
let b = (*v as i32).to_le_bytes();
let decimal = Decimal::Int32 {
value: b,
precision: *p as i32,
scale: *s as i32,
};
sbbf.check(&decimal)
}
Type::INT64 => {
let b = (*v as i64).to_le_bytes();
let decimal = Decimal::Int64 {
value: b,
precision: *p as i32,
scale: *s as i32,
};
sbbf.check(&decimal)
}
Type::FIXED_LEN_BYTE_ARRAY => {
let b = v.to_le_bytes().to_vec();
let decimal = Decimal::Bytes {
value: b.into(),
precision: *p as i32,
scale: *s as i32,
};
sbbf.check(&decimal)
}
_ => true,
},
_ => true,
}
})
// The row group doesn't contain any of the values if
// all the checks are false
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub struct ParquetScanOptions {
pub reorder_filters: bool,
/// enable page index
pub enable_page_index: bool,
/// enable bloom filter
pub enable_bloom_filter: bool,
}

impl ParquetScanOptions {
Expand All @@ -69,6 +71,7 @@ impl ParquetScanOptions {
config.execution.parquet.pushdown_filters = self.pushdown_filters;
config.execution.parquet.reorder_filters = self.reorder_filters;
config.execution.parquet.enable_page_index = self.enable_page_index;
config.execution.parquet.bloom_filter_enabled = self.enable_bloom_filter;
config.into()
}
}
Expand Down
131 changes: 131 additions & 0 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion::physical_plan::collect;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::prelude::{col, lit, lit_timestamp_nano, Expr, SessionContext};
use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
use datafusion_common::ScalarValue;
use datafusion_expr::utils::{conjunction, disjunction, split_conjunction};
use itertools::Itertools;
use parquet::file::properties::WriterProperties;
Expand Down Expand Up @@ -338,6 +339,59 @@ async fn single_file_small_data_pages() {
.await;
}

#[cfg(not(target_family = "windows"))]
#[tokio::test]
async fn single_file_with_bloom_filter() {
let tempdir = TempDir::new().unwrap();

let props = WriterProperties::builder()
.set_bloom_filter_enabled(true)
.build();
let test_parquet_file = generate_file(&tempdir, props);

TestCase::new(&test_parquet_file)
.with_name("bloom_filter_on_decimal128_int32")
// predicate is chosen carefully to prune row groups id from 0 to 99 except 50
// decimal128_id_int32 = 50
.with_filter(
col("decimal128_id_int32")
.in_list(vec![lit(ScalarValue::Decimal128(Some(50), 8, 0))], false),
)
.with_bloom_filter_filtering_expected(BloomFilterFilteringExpected::Some)
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(0)
.run()
.await;

TestCase::new(&test_parquet_file)
.with_name("bloom_filter_on_decimal128_int64")
// predicate is chosen carefully to prune row groups id from 0 to 99 except 50
// decimal128_id_int64 = 50
.with_filter(
col("decimal128_id_int64")
.in_list(vec![lit(ScalarValue::Decimal128(Some(50), 12, 0))], false),
)
.with_bloom_filter_filtering_expected(BloomFilterFilteringExpected::Some)
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(0)
.run()
.await;

TestCase::new(&test_parquet_file)
.with_name("bloom_filter_on_decimal128_fixed_len_byte_array")
// predicate is chosen carefully to prune row groups id from 0 to 99 except 50
// decimal128_id_fixed_len_byte = 50
.with_filter(
col("decimal128_id_fixed_len_byte")
.in_list(vec![lit(ScalarValue::Decimal128(Some(50), 38, 0))], false),
)
.with_bloom_filter_filtering_expected(BloomFilterFilteringExpected::Some)
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(0)
.run()
.await;
}

/// Expected pushdown behavior
#[derive(Debug, Clone, Copy)]
enum PushdownExpected {
Expand All @@ -356,6 +410,14 @@ enum PageIndexFilteringExpected {
Some,
}

#[derive(Debug, Clone, Copy)]
enum BloomFilterFilteringExpected {
/// Did not expect filter pushdown to filter any rows
None,
/// Expected that more than 0 were pruned
Some,
}

/// parameters for running a test
struct TestCase<'a> {
test_parquet_file: &'a TestParquetFile,
Expand All @@ -369,6 +431,9 @@ struct TestCase<'a> {
/// Did we expect page filtering to filter out pages
page_index_filtering_expected: PageIndexFilteringExpected,

/// Did we expect bloom filter filtering to filter out row
bloom_filter_filtering_expected: BloomFilterFilteringExpected,

/// How many rows are expected to pass the predicate overall?
expected_rows: usize,
}
Expand All @@ -382,6 +447,7 @@ impl<'a> TestCase<'a> {
filter: lit(true),
pushdown_expected: PushdownExpected::None,
page_index_filtering_expected: PageIndexFilteringExpected::None,
bloom_filter_filtering_expected: BloomFilterFilteringExpected::None,
expected_rows: 0,
}
}
Expand Down Expand Up @@ -412,6 +478,15 @@ impl<'a> TestCase<'a> {
self
}

/// Set the expected bloom filyer filtering
fn with_bloom_filter_filtering_expected(
mut self,
v: BloomFilterFilteringExpected,
) -> Self {
self.bloom_filter_filtering_expected = v;
self
}

/// Set the number of expected rows (to ensure the predicates have
/// a good range of selectivity
fn with_expected_rows(mut self, expected_rows: usize) -> Self {
Expand Down Expand Up @@ -442,6 +517,7 @@ impl<'a> TestCase<'a> {
pushdown_filters: false,
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
},
filter,
)
Expand All @@ -453,6 +529,7 @@ impl<'a> TestCase<'a> {
pushdown_filters: true,
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
},
filter,
)
Expand All @@ -466,6 +543,7 @@ impl<'a> TestCase<'a> {
pushdown_filters: true,
reorder_filters: true,
enable_page_index: false,
enable_bloom_filter: false,
},
filter,
)
Expand All @@ -479,6 +557,7 @@ impl<'a> TestCase<'a> {
pushdown_filters: false,
reorder_filters: false,
enable_page_index: true,
enable_bloom_filter: false,
},
filter,
)
Expand All @@ -491,12 +570,40 @@ impl<'a> TestCase<'a> {
pushdown_filters: true,
reorder_filters: true,
enable_page_index: true,
enable_bloom_filter: false,
},
filter,
)
.await;

assert_eq!(no_pushdown, pushdown_reordering_and_page_index);

let bloom_filter_only = self
.read_with_options(
ParquetScanOptions {
pushdown_filters: false,
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: true,
},
filter,
)
.await;
assert_eq!(no_pushdown, bloom_filter_only);

let pushdown_reordering_and_bloom_filter = self
.read_with_options(
ParquetScanOptions {
pushdown_filters: false,
reorder_filters: true,
enable_page_index: false,
enable_bloom_filter: true,
},
filter,
)
.await;

assert_eq!(no_pushdown, pushdown_reordering_and_bloom_filter);
}

/// Reads data from a test parquet file using the specified scan options
Expand Down Expand Up @@ -579,6 +686,30 @@ impl<'a> TestCase<'a> {
}
};

let row_groups_pruned_bloom_filter =
get_value(&metrics, "row_groups_pruned_bloom_filter");
println!(" row_groups_pruned_bloom_filter: {row_groups_pruned_bloom_filter}");

let bloom_filter_filtering_expected = if scan_options.enable_bloom_filter {
self.bloom_filter_filtering_expected
} else {
// if bloom filter filtering is not enabled, don't expect it
// to filter rows
BloomFilterFilteringExpected::None
};

match bloom_filter_filtering_expected {
BloomFilterFilteringExpected::None => {
assert_eq!(row_groups_pruned_bloom_filter, 0);
}
BloomFilterFilteringExpected::Some => {
assert!(
row_groups_pruned_bloom_filter > 0,
"Expected to filter rows via bloom filter but none were",
);
}
};

batch
}
}
Expand Down
Loading

0 comments on commit 9527792

Please sign in to comment.