Skip to content

Commit

Permalink
Merge 453122d into f1a831f
Browse files Browse the repository at this point in the history
  • Loading branch information
nevi-me authored Jun 30, 2021
2 parents f1a831f + 453122d commit 7907cc4
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 60 deletions.
159 changes: 143 additions & 16 deletions parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
}
}

#[derive(Debug)]
struct Stats<T: DataType> {
min: Option<T::T>,
max: Option<T::T>,
nulls_count: Option<u64>,
distinct_count: Option<u64>,
}

/// Convenience method to get the next ColumnWriter from the RowGroupWriter
#[inline]
#[allow(clippy::borrowed_box)]
Expand Down Expand Up @@ -236,7 +244,7 @@ fn write_leaf(
let column = column.slice(levels.offset, levels.length);
let written = match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
let values = match column.data_type() {
let (values, stats) = match column.data_type() {
ArrowDataType::Date64 => {
// If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
let array = if let ArrowDataType::Date64 = column.data_type() {
Expand All @@ -250,7 +258,15 @@ fn write_leaf(
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get int32 array");
get_numeric_array_slice::<Int32Type, _>(&array, &indices)
let values =
get_numeric_array_slice::<Int32Type, _>(&array, &indices);
let stats = Stats::<Int32Type> {
min: arrow::compute::min(&array),
max: arrow::compute::max(&array),
nulls_count: Some(array.null_count() as u64),
distinct_count: None,
};
(values, stats)
}
ArrowDataType::UInt32 => {
// follow C++ implementation and use overflow/reinterpret cast from u32 to i32 which will map
Expand All @@ -259,46 +275,79 @@ fn write_leaf(
.as_any()
.downcast_ref::<arrow_array::UInt32Array>()
.expect("Unable to get u32 array");
let stats = Stats::<Int32Type> {
min: arrow::compute::min(&array).map(|v| v as i32),
max: arrow::compute::max(&array).map(|v| v as i32),
nulls_count: Some(array.null_count() as u64),
distinct_count: None,
};
let array = arrow::compute::unary::<_, _, arrow::datatypes::Int32Type>(
array,
|x| x as i32,
);
get_numeric_array_slice::<Int32Type, _>(&array, &indices)
let values =
get_numeric_array_slice::<Int32Type, _>(&array, &indices);
(values, stats)
}
_ => {
let array = arrow::compute::cast(&column, &ArrowDataType::Int32)?;
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get i32 array");
get_numeric_array_slice::<Int32Type, _>(&array, &indices)
let values =
get_numeric_array_slice::<Int32Type, _>(&array, &indices);
let stats = Stats::<Int32Type> {
min: arrow::compute::min(&array),
max: arrow::compute::max(&array),
nulls_count: Some(array.null_count() as u64),
distinct_count: None,
};
(values, stats)
}
};
typed.write_batch(
typed.write_batch_with_statistics(
values.as_slice(),
Some(levels.definition.as_slice()),
levels.repetition.as_deref(),
&stats.min,
&stats.max,
stats.nulls_count,
stats.distinct_count,
)?
}
ColumnWriter::BoolColumnWriter(ref mut typed) => {
let array = column
.as_any()
.downcast_ref::<arrow_array::BooleanArray>()
.expect("Unable to get boolean array");
typed.write_batch(
typed.write_batch_with_statistics(
get_bool_array_slice(&array, &indices).as_slice(),
Some(levels.definition.as_slice()),
levels.repetition.as_deref(),
&arrow::compute::min_boolean(&array),
&arrow::compute::max_boolean(&array),
Some(array.null_count() as u64),
None,
)?
}
ColumnWriter::Int64ColumnWriter(ref mut typed) => {
let values = match column.data_type() {
let (values, stats) = match column.data_type() {
ArrowDataType::Int64 => {
let array = column
.as_any()
.downcast_ref::<arrow_array::Int64Array>()
.expect("Unable to get i64 array");
get_numeric_array_slice::<Int64Type, _>(&array, &indices)
let stats = Stats::<Int64Type> {
min: arrow::compute::min(&array),
max: arrow::compute::max(&array),
nulls_count: Some(array.null_count() as u64),
distinct_count: None,
};
(
get_numeric_array_slice::<Int64Type, _>(&array, &indices),
stats,
)
}
ArrowDataType::UInt64 => {
// follow C++ implementation and use overflow/reinterpret cast from u64 to i64 which will map
Expand All @@ -307,25 +356,47 @@ fn write_leaf(
.as_any()
.downcast_ref::<arrow_array::UInt64Array>()
.expect("Unable to get u64 array");
let stats = Stats::<Int64Type> {
min: arrow::compute::min(&array).map(|v| v as i64),
max: arrow::compute::max(&array).map(|v| v as i64),
nulls_count: Some(array.null_count() as u64),
distinct_count: None,
};
let array = arrow::compute::unary::<_, _, arrow::datatypes::Int64Type>(
array,
|x| x as i64,
);
get_numeric_array_slice::<Int64Type, _>(&array, &indices)
(
get_numeric_array_slice::<Int64Type, _>(&array, &indices),
stats,
)
}
_ => {
let array = arrow::compute::cast(&column, &ArrowDataType::Int64)?;
let array = array
.as_any()
.downcast_ref::<arrow_array::Int64Array>()
.expect("Unable to get i64 array");
get_numeric_array_slice::<Int64Type, _>(&array, &indices)
let stats = Stats::<Int64Type> {
min: arrow::compute::min(&array),
max: arrow::compute::max(&array),
nulls_count: Some(array.null_count() as u64),
distinct_count: None,
};
(
get_numeric_array_slice::<Int64Type, _>(&array, &indices),
stats,
)
}
};
typed.write_batch(
typed.write_batch_with_statistics(
values.as_slice(),
Some(levels.definition.as_slice()),
levels.repetition.as_deref(),
&stats.min,
&stats.max,
stats.nulls_count,
stats.distinct_count,
)?
}
ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
Expand All @@ -336,21 +407,29 @@ fn write_leaf(
.as_any()
.downcast_ref::<arrow_array::Float32Array>()
.expect("Unable to get Float32 array");
typed.write_batch(
typed.write_batch_with_statistics(
get_numeric_array_slice::<FloatType, _>(&array, &indices).as_slice(),
Some(levels.definition.as_slice()),
levels.repetition.as_deref(),
&arrow::compute::min(&array),
&arrow::compute::max(&array),
Some(array.null_count() as u64),
None,
)?
}
ColumnWriter::DoubleColumnWriter(ref mut typed) => {
let array = column
.as_any()
.downcast_ref::<arrow_array::Float64Array>()
.expect("Unable to get Float64 array");
typed.write_batch(
typed.write_batch_with_statistics(
get_numeric_array_slice::<DoubleType, _>(&array, &indices).as_slice(),
Some(levels.definition.as_slice()),
levels.repetition.as_deref(),
&arrow::compute::min(&array),
&arrow::compute::max(&array),
Some(array.null_count() as u64),
None,
)?
}
ColumnWriter::ByteArrayColumnWriter(ref mut typed) => match column.data_type() {
Expand All @@ -370,10 +449,14 @@ fn write_leaf(
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.expect("Unable to get LargeBinaryArray array");
typed.write_batch(
typed.write_batch_with_statistics(
get_string_array(&array).as_slice(),
Some(levels.definition.as_slice()),
levels.repetition.as_deref(),
&arrow::compute::min_string(&array).map(|s| s.into()),
&arrow::compute::max_string(&array).map(|s| s.into()),
Some(array.null_count() as u64),
None,
)?
}
ArrowDataType::LargeBinary => {
Expand All @@ -392,10 +475,14 @@ fn write_leaf(
.as_any()
.downcast_ref::<arrow_array::LargeStringArray>()
.expect("Unable to get LargeUtf8 array");
typed.write_batch(
typed.write_batch_with_statistics(
get_large_string_array(&array).as_slice(),
Some(levels.definition.as_slice()),
levels.repetition.as_deref(),
&arrow::compute::min_string(&array).map(|s| s.into()),
&arrow::compute::max_string(&array).map(|s| s.into()),
Some(array.null_count() as u64),
None,
)?
}
_ => unreachable!("Currently unreachable because data type not supported"),
Expand Down Expand Up @@ -703,7 +790,6 @@ mod tests {
let batch =
RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap();

// I think this setup is incorrect because this should pass
assert_eq!(batch.column(0).data().null_count(), 1);

let file = get_temp_file("test_arrow_writer_list.parquet", &[]);
Expand All @@ -712,6 +798,47 @@ mod tests {
writer.close().unwrap();
}

#[test]
fn arrow_writer_list_non_null() {
// define schema
let schema = Schema::new(vec![Field::new(
"a",
DataType::List(Box::new(Field::new("item", DataType::Int32, false))),
false,
)]);

// create some data
let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// Construct a buffer for value offsets, for the nested array:
// [[1], [2, 3], [], [4, 5, 6], [7, 8, 9, 10]]
let a_value_offsets =
arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());

// Construct a list array from the above two
let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new(
"item",
DataType::Int32,
false,
))))
.len(5)
.add_buffer(a_value_offsets)
.add_child_data(a_values.data().clone())
.build();
let a = ListArray::from(a_list_data);

// build a record batch
let batch =
RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap();

assert_eq!(batch.column(0).data().null_count(), 0);

let file = get_temp_file("test_arrow_writer_list_non_null.parquet", &[]);
let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
}

#[test]
fn arrow_writer_binary() {
let string_field = Field::new("a", DataType::Utf8, false);
Expand Down
Loading

0 comments on commit 7907cc4

Please sign in to comment.