From c922ba1cb3643b4fb66e79219ded07a0a647a5e1 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Mon, 20 May 2024 17:37:14 -0400 Subject: [PATCH 1/2] test: add more tests for statistics reading --- .../core/tests/parquet/arrow_statistics.rs | 243 ++++++++++++++++-- datafusion/core/tests/parquet/mod.rs | 2 +- 2 files changed, 229 insertions(+), 16 deletions(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 272afea7b28a..9a9aa0c36ce1 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -22,8 +22,7 @@ use std::fs::File; use std::sync::Arc; use arrow_array::{ - make_array, Array, ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array, - RecordBatch, UInt64Array, + make_array, Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt64Array }; use arrow_schema::{DataType, Field, Schema}; use datafusion::datasource::physical_plan::parquet::{ @@ -624,20 +623,234 @@ async fn test_dates_64_diff_rg_sizes() { .run("date64"); } +// BUG: +// Todo: open a ticket +#[tokio::test] +async fn test_uint() { + let row_per_group = 4; + + // This creates a parquet files of 4 columns named "u8", "u16", "u32", "u64" + // "u8" --> UInt8Array + // "u16" --> UInt16Array + // "u32" --> UInt32Array + // "u64" --> UInt64Array + + // The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 5 row groups with size 4 + let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; + + // u8 + // BUG: expect UInt8Array but returns Int32Array + Test { + reader, + expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt8Array + expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt8Array + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + } + .run("u8"); + + // u16 + // BUG: expect UInt16Array but returns Int32Array + let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt16Array + expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt16Array + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + } + .run("u16"); + + // u32 + // BUG: expect UInt32Array but returns Int32Array + let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt32Array + expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt32Array + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + } + .run("u32"); + + // u64 + // BUG: expect UInt64rray but returns Int64Array + let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; + Test { + reader, + expected_min: Arc::new(Int64Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt64Array + expected_max: Arc::new(Int64Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt64Array + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + } + .run("u64"); +} + +#[tokio::test] +async fn test_int32_range() { + let row_per_group = 5; + // This creates a parquet file of 1 column "i" + // file has 2 record batches, each has 2 rows. They will be saved into one row group + let reader = parquet_file_many_columns(Scenario::Int32Range, row_per_group).await; + + Test { + reader, + expected_min: Arc::new(Int32Array::from(vec![0])), + expected_max: Arc::new(Int32Array::from(vec![300000])), + expected_null_counts: UInt64Array::from(vec![0]), + expected_row_counts: UInt64Array::from(vec![4]), + } + .run("i"); +} + +// BUG: not convert UInt32Array to Int32Array +// todo: file a ticket +#[tokio::test] +async fn test_uint32_range() { + let row_per_group = 5; + // This creates a parquet file of 1 column "u" + // file has 2 record batches, each has 2 rows. They will be saved into one row group + let reader = parquet_file_many_columns(Scenario::UInt32Range, row_per_group).await; + + Test { + reader, + expected_min: Arc::new(Int32Array::from(vec![0])), // shoudld be UInt32Array + expected_max: Arc::new(Int32Array::from(vec![300000])), // shoudld be UInt32Array + expected_null_counts: UInt64Array::from(vec![0]), + expected_row_counts: UInt64Array::from(vec![4]), + } + .run("u"); +} + +#[tokio::test] +async fn test_float64() { + let row_per_group = 5; + // This creates a parquet file of 1 column "f" + // file has 4 record batches, each has 5 rows. They will be saved into 4 row groups + let reader = parquet_file_many_columns(Scenario::Float64, row_per_group).await; + + Test { + reader, + expected_min: Arc::new(Float64Array::from(vec![-5.0, -4.0, -0.0, 5.0])), + expected_max: Arc::new(Float64Array::from(vec![-1.0, 0.0, 4.0, 9.0])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + } + .run("f"); +} + +#[tokio::test] +async fn test_decimal() { + let row_per_group = 5; + // This creates a parquet file of 1 column "decimal_col" with decimal data type and precicion 9, scale 2 + // file has 3 record batches, each has 5 rows. They will be saved into 3 row groups + let reader = parquet_file_many_columns(Scenario::Decimal, row_per_group).await; + + Test { + reader, + expected_min: Arc::new(Decimal128Array::from(vec![100, -500, 2000]).with_precision_and_scale(9, 2).unwrap()), + expected_max: Arc::new(Decimal128Array::from(vec![600, 600, 6000]).with_precision_and_scale(9, 2).unwrap()), + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + } + .run("decimal_col"); +} + +// BUG: not convert BinaryArray to StringArray +// todo: file a ticket +#[tokio::test] +async fn test_byte() { + let row_per_group = 5; + + // This creates a parquet file of 4 columns + // "name" + // "service_string" + // "service_binary" + // "service_fixedsize" + + // file has 3 record batches, each has 5 rows. They will be saved into 3 row groups + let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; + + // column "name" + Test { + reader, + expected_min: Arc::new(StringArray::from(vec!["all frontends", "mixed", "all backends"])), + expected_max: Arc::new(StringArray::from(vec!["all frontends", "mixed", "all backends"])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + } + .run("name"); + + // column "service_string" + let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; + Test { + reader, + expected_min: Arc::new(StringArray::from(vec!["frontend five", "backend one", "backend eight"])), + expected_max: Arc::new(StringArray::from(vec!["frontend two", "frontend six", "backend six"])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + } + .run("service_string"); + + // column "service_binary" + let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; + Test { + reader, + expected_min: Arc::new(StringArray::from(vec!["frontend five", "backend one", "backend eight"])), // Shuld be ByteArray + expected_max: Arc::new(StringArray::from(vec!["frontend two", "frontend six", "backend six"])), // Shuld be ByteArray + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + } + .run("service_binary"); + + // column "service_fixedsize" + // b"fe1", b"be1", b"be4" + let min_input = vec![vec![102, 101, 49], vec![98, 101, 49], vec![98, 101, 52]]; + // b"fe5", b"fe6", b"be8" + let max_input = vec![vec![102, 101, 55], vec![102, 101, 54], vec![98, 101, 56]]; + let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; + Test { + reader, + expected_min: Arc::new(FixedSizeBinaryArray::try_from_iter(min_input.into_iter()).unwrap()), + expected_max: Arc::new(FixedSizeBinaryArray::try_from_iter(max_input.into_iter()).unwrap()), + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + } + .run("service_fixedsize"); +} + +// PeriodsInColumnNames +#[tokio::test] +async fn test_period_in_column_names() { + let row_per_group = 5; + // This creates a parquet file of 2 columns "name" and "service.name" + // file has 3 record batches, each has 5 rows. They will be saved into 3 row groups + let reader = parquet_file_many_columns(Scenario::PeriodsInColumnNames, row_per_group).await; + + // column "name" + Test { + reader, + expected_min: Arc::new(StringArray::from(vec!["HTTP GET / DISPATCH", "HTTP PUT / DISPATCH", "HTTP GET / DISPATCH"])), + expected_max: Arc::new(StringArray::from(vec!["HTTP GET / DISPATCH", "HTTP PUT / DISPATCH", "HTTP GET / DISPATCH"])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + } + .run("name"); + + // column "service.name" + let reader = parquet_file_many_columns(Scenario::PeriodsInColumnNames, row_per_group).await; + Test { + reader, + expected_min: Arc::new(StringArray::from(vec!["frontend", "backend", "backend"])), + expected_max: Arc::new(StringArray::from(vec![ "frontend", "frontend", "backend"])), + expected_null_counts: UInt64Array::from(vec![0, 0, 0]), + expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + } + .run("service.name"); +} + + // TODO: -// Other data types to tests -// `u8`, `u16`, `u32`, and `u64`, -// UInt, -// UInt32Range, -// Float64, -// Decimal, -// DecimalBloomFilterInt32, -// DecimalBloomFilterInt64, -// DecimalLargePrecision, -// DecimalLargePrecisionBloomFilter, -// ByteArray, -// PeriodsInColumnNames, -// WithNullValuesPageLevel, // WITHOUT Stats /////// NEGATIVE TESTS /////// diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 6e3f366b4373..bdc39c269d29 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -427,7 +427,7 @@ fn make_int_batches(start: i8, end: i8) -> RecordBatch { .unwrap() } -/// Return record batch with i8, i16, i32, and i64 sequences +/// Return record batch with u8, u16, u32, and u64 sequences /// /// Columns are named /// "u8" -> UInt8Array From d89c8c6dc1a469e7e5543684777ba2da94a80573 Mon Sep 17 00:00:00 2001 From: NGA-TRAN Date: Tue, 21 May 2024 12:10:28 -0400 Subject: [PATCH 2/2] Link bug tickets to the tests and run fmt --- .../core/tests/parquet/arrow_statistics.rs | 106 +++++++++++++----- 1 file changed, 77 insertions(+), 29 deletions(-) diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 9a9aa0c36ce1..432d109b2235 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -22,7 +22,8 @@ use std::fs::File; use std::sync::Arc; use arrow_array::{ - make_array, Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt64Array + make_array, Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, Float64Array, + Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt64Array, }; use arrow_schema::{DataType, Field, Schema}; use datafusion::datasource::physical_plan::parquet::{ @@ -623,8 +624,8 @@ async fn test_dates_64_diff_rg_sizes() { .run("date64"); } -// BUG: -// Todo: open a ticket +// BUG: +// https://github.com/apache/datafusion/issues/10604 #[tokio::test] async fn test_uint() { let row_per_group = 4; @@ -642,7 +643,7 @@ async fn test_uint() { // BUG: expect UInt8Array but returns Int32Array Test { reader, - expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt8Array + expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt8Array expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt8Array expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), @@ -654,7 +655,7 @@ async fn test_uint() { let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; Test { reader, - expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt16Array + expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt16Array expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt16Array expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), @@ -666,8 +667,8 @@ async fn test_uint() { let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; Test { reader, - expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt32Array - expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt32Array + expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt32Array + expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt32Array expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), } @@ -678,7 +679,7 @@ async fn test_uint() { let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await; Test { reader, - expected_min: Arc::new(Int64Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt64Array + expected_min: Arc::new(Int64Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt64Array expected_max: Arc::new(Int64Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt64Array expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), @@ -704,7 +705,7 @@ async fn test_int32_range() { } // BUG: not convert UInt32Array to Int32Array -// todo: file a ticket +// https://github.com/apache/datafusion/issues/10604 #[tokio::test] async fn test_uint32_range() { let row_per_group = 5; @@ -714,8 +715,8 @@ async fn test_uint32_range() { Test { reader, - expected_min: Arc::new(Int32Array::from(vec![0])), // shoudld be UInt32Array - expected_max: Arc::new(Int32Array::from(vec![300000])), // shoudld be UInt32Array + expected_min: Arc::new(Int32Array::from(vec![0])), // shoudld be UInt32Array + expected_max: Arc::new(Int32Array::from(vec![300000])), // shoudld be UInt32Array expected_null_counts: UInt64Array::from(vec![0]), expected_row_counts: UInt64Array::from(vec![4]), } @@ -748,8 +749,16 @@ async fn test_decimal() { Test { reader, - expected_min: Arc::new(Decimal128Array::from(vec![100, -500, 2000]).with_precision_and_scale(9, 2).unwrap()), - expected_max: Arc::new(Decimal128Array::from(vec![600, 600, 6000]).with_precision_and_scale(9, 2).unwrap()), + expected_min: Arc::new( + Decimal128Array::from(vec![100, -500, 2000]) + .with_precision_and_scale(9, 2) + .unwrap(), + ), + expected_max: Arc::new( + Decimal128Array::from(vec![600, 600, 6000]) + .with_precision_and_scale(9, 2) + .unwrap(), + ), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), } @@ -757,7 +766,7 @@ async fn test_decimal() { } // BUG: not convert BinaryArray to StringArray -// todo: file a ticket +// https://github.com/apache/datafusion/issues/10605 #[tokio::test] async fn test_byte() { let row_per_group = 5; @@ -767,15 +776,23 @@ async fn test_byte() { // "service_string" // "service_binary" // "service_fixedsize" - + // file has 3 record batches, each has 5 rows. They will be saved into 3 row groups let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; // column "name" Test { reader, - expected_min: Arc::new(StringArray::from(vec!["all frontends", "mixed", "all backends"])), - expected_max: Arc::new(StringArray::from(vec!["all frontends", "mixed", "all backends"])), + expected_min: Arc::new(StringArray::from(vec![ + "all frontends", + "mixed", + "all backends", + ])), + expected_max: Arc::new(StringArray::from(vec![ + "all frontends", + "mixed", + "all backends", + ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), } @@ -785,8 +802,16 @@ async fn test_byte() { let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; Test { reader, - expected_min: Arc::new(StringArray::from(vec!["frontend five", "backend one", "backend eight"])), - expected_max: Arc::new(StringArray::from(vec!["frontend two", "frontend six", "backend six"])), + expected_min: Arc::new(StringArray::from(vec![ + "frontend five", + "backend one", + "backend eight", + ])), + expected_max: Arc::new(StringArray::from(vec![ + "frontend two", + "frontend six", + "backend six", + ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), } @@ -796,8 +821,16 @@ async fn test_byte() { let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; Test { reader, - expected_min: Arc::new(StringArray::from(vec!["frontend five", "backend one", "backend eight"])), // Shuld be ByteArray - expected_max: Arc::new(StringArray::from(vec!["frontend two", "frontend six", "backend six"])), // Shuld be ByteArray + expected_min: Arc::new(StringArray::from(vec![ + "frontend five", + "backend one", + "backend eight", + ])), // Shuld be BinaryArray + expected_max: Arc::new(StringArray::from(vec![ + "frontend two", + "frontend six", + "backend six", + ])), // Shuld be BinaryArray expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), } @@ -811,8 +844,12 @@ async fn test_byte() { let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await; Test { reader, - expected_min: Arc::new(FixedSizeBinaryArray::try_from_iter(min_input.into_iter()).unwrap()), - expected_max: Arc::new(FixedSizeBinaryArray::try_from_iter(max_input.into_iter()).unwrap()), + expected_min: Arc::new( + FixedSizeBinaryArray::try_from_iter(min_input.into_iter()).unwrap(), + ), + expected_max: Arc::new( + FixedSizeBinaryArray::try_from_iter(max_input.into_iter()).unwrap(), + ), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), } @@ -825,31 +862,42 @@ async fn test_period_in_column_names() { let row_per_group = 5; // This creates a parquet file of 2 columns "name" and "service.name" // file has 3 record batches, each has 5 rows. They will be saved into 3 row groups - let reader = parquet_file_many_columns(Scenario::PeriodsInColumnNames, row_per_group).await; + let reader = + parquet_file_many_columns(Scenario::PeriodsInColumnNames, row_per_group).await; // column "name" Test { reader, - expected_min: Arc::new(StringArray::from(vec!["HTTP GET / DISPATCH", "HTTP PUT / DISPATCH", "HTTP GET / DISPATCH"])), - expected_max: Arc::new(StringArray::from(vec!["HTTP GET / DISPATCH", "HTTP PUT / DISPATCH", "HTTP GET / DISPATCH"])), + expected_min: Arc::new(StringArray::from(vec![ + "HTTP GET / DISPATCH", + "HTTP PUT / DISPATCH", + "HTTP GET / DISPATCH", + ])), + expected_max: Arc::new(StringArray::from(vec![ + "HTTP GET / DISPATCH", + "HTTP PUT / DISPATCH", + "HTTP GET / DISPATCH", + ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), } .run("name"); // column "service.name" - let reader = parquet_file_many_columns(Scenario::PeriodsInColumnNames, row_per_group).await; + let reader = + parquet_file_many_columns(Scenario::PeriodsInColumnNames, row_per_group).await; Test { reader, expected_min: Arc::new(StringArray::from(vec!["frontend", "backend", "backend"])), - expected_max: Arc::new(StringArray::from(vec![ "frontend", "frontend", "backend"])), + expected_max: Arc::new(StringArray::from(vec![ + "frontend", "frontend", "backend", + ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), expected_row_counts: UInt64Array::from(vec![5, 5, 5]), } .run("service.name"); } - // TODO: // WITHOUT Stats