From 4af59a5d618d921f5741f980a836c2324c8b3db8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 11 Dec 2023 17:28:42 +0000 Subject: [PATCH] Reserve capacity for null padding --- .../array_reader/fixed_len_byte_array.rs | 34 +++++++++++++------ parquet/src/arrow/buffer/dictionary_buffer.rs | 11 ++++++ parquet/src/arrow/buffer/offset_buffer.rs | 8 +++++ parquet/src/arrow/record_reader/buffer.rs | 16 ++++++++- parquet/src/arrow/record_reader/mod.rs | 19 +++++++++-- 5 files changed, 74 insertions(+), 14 deletions(-) diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index a0d25d403c1b..190f33b76f73 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -128,13 +128,14 @@ impl FixedLenByteArrayReader { data_type: ArrowType, byte_length: usize, ) -> Self { + let values = FixedLenByteArrayBuffer::new(byte_length); Self { data_type, byte_length, pages, def_levels_buffer: None, rep_levels_buffer: None, - record_reader: GenericRecordReader::new(column_desc), + record_reader: GenericRecordReader::new_with_values(column_desc, values), } } } @@ -232,14 +233,26 @@ impl ArrayReader for FixedLenByteArrayReader { } } -#[derive(Default)] struct FixedLenByteArrayBuffer { buffer: Vec, /// The length of each element in bytes - byte_length: Option, + byte_length: usize, +} + +impl FixedLenByteArrayBuffer { + fn new(byte_length: usize) -> Self { + Self { + byte_length, + buffer: Default::default(), + } + } } impl ValuesBuffer for FixedLenByteArrayBuffer { + fn reserve(&mut self, additional: usize) { + self.buffer.reserve(additional * self.byte_length); + } + fn pad_nulls( &mut self, read_offset: usize, @@ -247,8 +260,7 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { levels_read: usize, valid_mask: &[u8], ) { - let byte_length = self.byte_length.unwrap_or_default(); - + let byte_length = self.byte_length; assert_eq!(self.buffer.len(), (read_offset + values_read) * byte_length); self.buffer .resize((read_offset + levels_read) * byte_length, 0); @@ -268,6 +280,13 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { } } } + + fn take(&mut self) -> Self { + Self { + buffer: std::mem::take(&mut self.buffer), + byte_length: self.byte_length, + } + } } struct ValueDecoder { @@ -345,11 +364,6 @@ impl ColumnValueDecoder for ValueDecoder { } fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { - match out.byte_length { - Some(x) => assert_eq!(x, self.byte_length), - None => out.byte_length = Some(self.byte_length), - } - match self.decoder.as_mut().unwrap() { Decoder::Plain { offset, buf } => { let to_read = diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs b/parquet/src/arrow/buffer/dictionary_buffer.rs index 9e5b2293aa01..f09079311c8b 100644 --- a/parquet/src/arrow/buffer/dictionary_buffer.rs +++ b/parquet/src/arrow/buffer/dictionary_buffer.rs @@ -185,6 +185,13 @@ impl DictionaryBuffer { } impl ValuesBuffer for DictionaryBuffer { + fn reserve(&mut self, additional: usize) { + match self { + Self::Dict { keys, .. } => keys.reserve(additional), + Self::Values { values, .. } => values.reserve(additional), + } + } + fn pad_nulls( &mut self, read_offset: usize, @@ -202,6 +209,10 @@ impl ValuesBuffer for DictionaryBuffer Self { + std::mem::take(self) + } } #[cfg(test)] diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs index ce9eb1142a5b..0e77cf64da4c 100644 --- a/parquet/src/arrow/buffer/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -141,6 +141,10 @@ impl OffsetBuffer { } impl ValuesBuffer for OffsetBuffer { + fn reserve(&mut self, additional: usize) { + self.offsets.reserve(additional) + } + fn pad_nulls( &mut self, read_offset: usize, @@ -188,6 +192,10 @@ impl ValuesBuffer for OffsetBuffer { *x = last_start_offset } } + + fn take(&mut self) -> Self { + std::mem::take(self) + } } #[cfg(test)] diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 880407a54745..6fb9872b2dbe 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -18,7 +18,10 @@ use crate::arrow::buffer::bit_util::iter_set_bits_rev; /// A buffer that supports padding with nulls -pub trait ValuesBuffer: Default { +pub trait ValuesBuffer { + /// Reserve space for `additional` values + fn reserve(&mut self, additional: usize); + /// If a column contains nulls, more level data may be read than value data, as null /// values are not encoded. Therefore, first the levels data is read, the null count /// determined, and then the corresponding number of values read to a [`ValuesBuffer`]. @@ -40,9 +43,16 @@ pub trait ValuesBuffer: Default { levels_read: usize, valid_mask: &[u8], ); + + /// Take the contents of this buffer + fn take(&mut self) -> Self; } impl ValuesBuffer for Vec { + fn reserve(&mut self, additional: usize) { + self.reserve(additional) + } + fn pad_nulls( &mut self, read_offset: usize, @@ -61,4 +71,8 @@ impl ValuesBuffer for Vec { self[level_pos] = self[value_pos]; } } + + fn take(&mut self) -> Self { + std::mem::take(self) + } } diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index 7456da053b9c..15b04e16fd54 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -62,18 +62,28 @@ pub struct GenericRecordReader { impl GenericRecordReader where - V: ValuesBuffer, + V: ValuesBuffer + Default, CV: ColumnValueDecoder, { /// Create a new [`GenericRecordReader`] pub fn new(desc: ColumnDescPtr) -> Self { + Self::new_with_values(desc, V::default()) + } +} + +impl GenericRecordReader +where + V: ValuesBuffer, + CV: ColumnValueDecoder, +{ + pub fn new_with_values(desc: ColumnDescPtr, values: V) -> Self { let def_levels = (desc.max_def_level() > 0) .then(|| DefinitionLevelBuffer::new(&desc, packed_null_mask(&desc))); let rep_levels = (desc.max_rep_level() > 0).then(Vec::new); Self { - values: V::default(), + values, def_levels, rep_levels, column_reader: None, @@ -169,7 +179,7 @@ where /// Returns currently stored buffer data. /// The side effect is similar to `consume_def_levels`. pub fn consume_record_data(&mut self) -> V { - std::mem::take(&mut self.values) + self.values.take() } /// Returns currently stored null bitmap data. @@ -195,6 +205,9 @@ where /// Try to read one batch of data returning the number of records read fn read_one_batch(&mut self, batch_size: usize) -> Result { + // Reserve additional space in values for null padding + self.values.reserve(batch_size); + let (records_read, values_read, levels_read) = self.column_reader.as_mut().unwrap().read_records( batch_size,