Skip to content

Commit

Permalink
Write null counts in parquet files when they are present
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 15, 2024
1 parent 69b17ad commit 2d70413
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 54 deletions.
27 changes: 26 additions & 1 deletion parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,8 @@ pub struct StatisticsConverter<'a> {
parquet_column_index: Option<usize>,
/// The field (with data type) of the column in the Arrow schema
arrow_field: &'a Field,
/// treat missing null_counts as 0 nulls
missing_null_counts_as_zero: bool,
}

impl<'a> StatisticsConverter<'a> {
Expand All @@ -1195,6 +1197,18 @@ impl<'a> StatisticsConverter<'a> {
self.arrow_field
}

/// Set the statistics converter to treat missing null counts as missing
///
/// By default, the converter will treat missing null counts as 0 nulls.
///
/// Due to <https://github.com/apache/arrow-rs/pull/6257>, prior to version
/// 53.0.0, parquet files written by parquet-rs did not store null counts
/// when there were zero nulls.
pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self {
self.missing_null_counts_as_zero = missing_null_counts_as_zero;
self
}

/// Returns a [`UInt64Array`] with row counts for each row group
///
/// # Return Value
Expand Down Expand Up @@ -1288,6 +1302,7 @@ impl<'a> StatisticsConverter<'a> {
Ok(Self {
parquet_column_index: parquet_index,
arrow_field,
missing_null_counts_as_zero: true,
})
}

Expand Down Expand Up @@ -1386,7 +1401,15 @@ impl<'a> StatisticsConverter<'a> {
let null_counts = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.and_then(|s| s.null_count_opt()));
.map(|s| {
s.and_then(|s| {
if self.missing_null_counts_as_zero {
Some(s.null_count_opt().unwrap_or(0))
} else {
s.null_count_opt()
}
})
});
Ok(UInt64Array::from_iter(null_counts))
}

Expand Down Expand Up @@ -1597,3 +1620,5 @@ impl<'a> StatisticsConverter<'a> {
new_null_array(data_type, num_row_groups)
}
}

// See tests in parquet/tests/arrow_reader/statistics.rs
160 changes: 136 additions & 24 deletions parquet/src/file/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,35 @@ pub fn from_thrift(
) -> Result<Option<Statistics>> {
Ok(match thrift_stats {
Some(stats) => {
// Number of nulls recorded, when it is not available, we just mark it as 0.
// TODO this should be `None` if there is no information about NULLS.
// see https://github.com/apache/arrow-rs/pull/6216/files
let null_count = stats.null_count.unwrap_or(0);

if null_count < 0 {
return Err(ParquetError::General(format!(
"Statistics null count is negative {}",
null_count
)));
}
// transform null count to u64
let null_count = stats
.null_count
.map(|null_count| {
if null_count < 0 {
return Err(ParquetError::General(format!(
"Statistics null count is negative {}",
null_count
)));
}
Ok(null_count as u64)
})
.transpose()?;

// Generic null count.
let null_count = Some(null_count as u64);
// Generic distinct count (count of distinct values occurring)
let distinct_count = stats.distinct_count.map(|value| value as u64);
let distinct_count = stats
.distinct_count
.map(|distinct_count| {
if distinct_count < 0 {
return Err(ParquetError::General(format!(
"Statistics distinct count is negative {}",
distinct_count
)));
}

Ok(distinct_count as u64)
})
.transpose()?;

// Whether or not statistics use deprecated min/max fields.
let old_format = stats.min_value.is_none() && stats.max_value.is_none();
// Generic min value as bytes.
Expand Down Expand Up @@ -244,20 +257,21 @@ pub fn from_thrift(
pub fn to_thrift(stats: Option<&Statistics>) -> Option<TStatistics> {
let stats = stats?;

// record null counts if greater than zero.
//
// TODO: This should be Some(0) if there are no nulls.
// see https://github.com/apache/arrow-rs/pull/6216/files
// record null count if it can fit in i64
let null_count = stats
.null_count_opt()
.map(|value| value as i64)
.filter(|&x| x > 0);
.and_then(|value| i64::try_from(value).ok());

// record distinct count if it can fit in i64
let distinct_count = stats
.distinct_count()
.and_then(|value| i64::try_from(value).ok());

let mut thrift_stats = TStatistics {
max: None,
min: None,
null_count,
distinct_count: stats.distinct_count().map(|value| value as i64),
distinct_count,
max_value: None,
min_value: None,
is_max_value_exact: None,
Expand Down Expand Up @@ -404,9 +418,20 @@ impl Statistics {
/// Returns number of null values for the column, if known.
/// Note that this includes all nulls when column is part of the complex type.
///
/// Note this API returns Some(0) even if the null count was not present
/// in the statistics.
/// See <https://github.com/apache/arrow-rs/pull/6216/files>
/// Note: Versions of this library prior to `53.0.0` returned 0 if the null count was
/// not available. This method returns `None` in that case.
///
/// Also, versions of this library prior to `53.0.0` did not store the null count in the
/// statistics if the null count was `0`.
///
/// To preserve the prior behavior and read null counts properly from older files
/// you should default to zero:
///
/// ```no_run
/// # use parquet::file::statistics::Statistics;
/// # let statistics: Statistics = todo!();
/// let null_count = statistics.null_count_opt().unwrap_or(0);
/// ```
pub fn null_count_opt(&self) -> Option<u64> {
statistics_enum_func![self, null_count_opt]
}
Expand Down Expand Up @@ -1041,4 +1066,91 @@ mod tests {
true,
));
}

#[test]
fn test_count_encoding() {
statistics_count_test(None, None);
statistics_count_test(Some(0), Some(0));
statistics_count_test(Some(100), Some(2000));
statistics_count_test(Some(1), None);
statistics_count_test(None, Some(1));
}

#[test]
fn test_count_encoding_distinct_too_large() {
// statistics are stored using i64, so test trying to store larger values
let statistics = make_bool_stats(Some(u64::MAX), Some(100));
let thrift_stats = to_thrift(Some(&statistics)).unwrap();
assert_eq!(thrift_stats.distinct_count, None); // can't store u64 max --> null
assert_eq!(thrift_stats.null_count, Some(100));
}

#[test]
fn test_count_encoding_null_too_large() {
// statistics are stored using i64, so test trying to store larger values
let statistics = make_bool_stats(Some(100), Some(u64::MAX));
let thrift_stats = to_thrift(Some(&statistics)).unwrap();
assert_eq!(thrift_stats.distinct_count, Some(100));
assert_eq!(thrift_stats.null_count, None); // can' store u64 max --> null
}

#[test]
fn test_count_decoding_distinct_invalid() {
let tstatistics = TStatistics {
distinct_count: Some(-42),
..Default::default()
};
let err = from_thrift(Type::BOOLEAN, Some(tstatistics)).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Statistics distinct count is negative -42"
);
}

#[test]
fn test_count_decoding_null_invalid() {
let tstatistics = TStatistics {
null_count: Some(-42),
..Default::default()
};
let err = from_thrift(Type::BOOLEAN, Some(tstatistics)).unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Statistics null count is negative -42"
);
}

/// Writes statistics to thrift and reads them back and ensures:
/// - The statistics are the same
/// - The statistics written to thrift are the same as the original statistics
fn statistics_count_test(distinct_count: Option<u64>, null_count: Option<u64>) {
let statistics = make_bool_stats(distinct_count, null_count);

let thrift_stats = to_thrift(Some(&statistics)).unwrap();
assert_eq!(thrift_stats.null_count.map(|c| c as u64), null_count);
assert_eq!(
thrift_stats.distinct_count.map(|c| c as u64),
distinct_count
);

let round_tripped = from_thrift(Type::BOOLEAN, Some(thrift_stats))
.unwrap()
.unwrap();
assert_eq!(round_tripped, statistics);
}

fn make_bool_stats(distinct_count: Option<u64>, null_count: Option<u64>) -> Statistics {
let min = Some(true);
let max = Some(false);
let is_min_max_deprecated = false;

// test is about the counts, so we aren't really testing the min/max values
Statistics::Boolean(ValueStatistics::new(
min,
max,
distinct_count,
null_count,
is_min_max_deprecated,
))
}
}
67 changes: 64 additions & 3 deletions parquet/tests/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::default::Default;
use std::fs::File;
use std::sync::Arc;

use super::make_test_file_rg;
use super::{struct_array, Scenario};
use arrow::compute::kernels::cast_utils::Parser;
use arrow::datatypes::{
Expand All @@ -37,16 +38,17 @@ use arrow_array::{
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use half::f16;
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::arrow::arrow_reader::{
ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
};
use parquet::arrow::ArrowWriter;
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use parquet::file::properties::{EnabledStatistics, WriterProperties};

use super::make_test_file_rg;
use parquet::file::statistics::{Statistics, ValueStatistics};
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};

#[derive(Debug, Default, Clone)]
struct Int64Case {
Expand Down Expand Up @@ -2139,6 +2141,65 @@ async fn test_missing_statistics() {
.run();
}

#[test]
fn missing_null_counts_as_zero() {
let min = None;
let max = None;
let distinct_count = None;
let null_count = None; // NB: no null count
let is_min_max_deprecated = false;
let stats = Statistics::Boolean(ValueStatistics::new(
min,
max,
distinct_count,
null_count,
is_min_max_deprecated,
));
let (arrow_schema, parquet_schema) = bool_arrow_and_parquet_schema();

let column_chunk = ColumnChunkMetaData::builder(parquet_schema.column(0))
.set_statistics(stats)
.build()
.unwrap();
let metadata = RowGroupMetaData::builder(parquet_schema.clone())
.set_column_metadata(vec![column_chunk])
.build()
.unwrap();

let converter = StatisticsConverter::try_new("b", &arrow_schema, &parquet_schema).unwrap();

// by default null count should be 0
assert_eq!(
converter.row_group_null_counts([&metadata]).unwrap(),
UInt64Array::from_iter(vec![Some(0)])
);

// if we disable missing null counts as zero flag null count will be None
let converter = converter.with_missing_null_counts_as_zero(false);
assert_eq!(
converter.row_group_null_counts([&metadata]).unwrap(),
UInt64Array::from_iter(vec![None])
);
}

/// return an Arrow schema and corresponding Parquet SchemaDescriptor for
/// a schema with a single boolean column "b"
fn bool_arrow_and_parquet_schema() -> (SchemaRef, SchemaDescPtr) {
let arrow_schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Boolean, true)]));
use parquet::schema::types::Type as ParquetType;
let parquet_schema = ParquetType::group_type_builder("schema")
.with_fields(vec![Arc::new(
ParquetType::primitive_type_builder("a", parquet::basic::Type::INT32)
.build()
.unwrap(),
)])
.build()
.unwrap();

let parquet_schema = Arc::new(SchemaDescriptor::new(Arc::new(parquet_schema)));
(arrow_schema, parquet_schema)
}

/////// NEGATIVE TESTS ///////
// column not found
#[tokio::test]
Expand Down
Loading

0 comments on commit 2d70413

Please sign in to comment.