diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index a0d25d403c1b..8ef228b1f0f0 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -254,7 +254,10 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { .resize((read_offset + levels_read) * byte_length, 0); let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { + for (value_pos, level_pos) in values_range + .rev() + .zip(iter_set_bits_rev(valid_mask, read_offset + levels_read)) + { debug_assert!(level_pos >= value_pos); if level_pos <= value_pos { break; diff --git a/parquet/src/arrow/buffer/bit_util.rs b/parquet/src/arrow/buffer/bit_util.rs index e7aea56a7f05..25d2f57911e7 100644 --- a/parquet/src/arrow/buffer/bit_util.rs +++ b/parquet/src/arrow/buffer/bit_util.rs @@ -25,8 +25,7 @@ pub fn count_set_bits(bytes: &[u8], range: Range) -> usize { } /// Iterates through the set bit positions in `bytes` in reverse order -pub fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator + '_ { - let bit_length = bytes.len() * 8; +pub fn iter_set_bits_rev(bytes: &[u8], bit_length: usize) -> impl Iterator + '_ { let unaligned = UnalignedBitChunk::new(bytes, 0, bit_length); let mut chunk_end_idx = bit_length + unaligned.lead_padding() + unaligned.trailing_padding(); @@ -78,7 +77,7 @@ mod tests { let mut nulls = BooleanBufferBuilder::new(mask_length); bools.iter().for_each(|b| nulls.append(*b)); - let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice()).collect(); + let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice(), mask_length).collect(); let expected: Vec<_> = bools .iter() .enumerate() @@ -87,7 +86,7 @@ mod tests { .collect(); assert_eq!(actual, expected); - assert_eq!(iter_set_bits_rev(&[]).count(), 0); + assert_eq!(iter_set_bits_rev(&[], 0).count(), 0); assert_eq!(count_set_bits(&[], 0..0), 0); assert_eq!(count_set_bits(&[0xFF], 1..1), 0); diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs b/parquet/src/arrow/buffer/dictionary_buffer.rs index 9e5b2293aa01..3255825bae6b 100644 --- a/parquet/src/arrow/buffer/dictionary_buffer.rs +++ b/parquet/src/arrow/buffer/dictionary_buffer.rs @@ -194,7 +194,6 @@ impl ValuesBuffer for DictionaryBuffer { - keys.resize(read_offset + levels_read, K::default()); keys.pad_nulls(read_offset, values_read, levels_read, valid_mask) } Self::Values { values, .. } => { diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs index ce9eb1142a5b..a0e7abcac5db 100644 --- a/parquet/src/arrow/buffer/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -161,7 +161,7 @@ impl ValuesBuffer for OffsetBuffer { for (value_pos, level_pos) in values_range .clone() .rev() - .zip(iter_set_bits_rev(valid_mask)) + .zip(iter_set_bits_rev(valid_mask, read_offset + levels_read)) { assert!(level_pos >= value_pos); assert!(level_pos < last_pos); diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 880407a54745..dad97c8fd51c 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -50,15 +50,20 @@ impl ValuesBuffer for Vec { levels_read: usize, valid_mask: &[u8], ) { + assert!(values_read <= levels_read); self.resize(read_offset + levels_read, T::default()); let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { + for (value_pos, level_pos) in values_range + .rev() + .zip(iter_set_bits_rev(valid_mask, read_offset + levels_read)) + { debug_assert!(level_pos >= value_pos); if level_pos <= value_pos { break; } - self[level_pos] = self[value_pos]; + // Safety: indices must be in bounds by construction + unsafe { *self.get_unchecked_mut(level_pos) = *self.get_unchecked(value_pos) } } } }