Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zero-copy Vec conversion (#3516) (#1176) #3756

Merged
merged 6 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions arrow-array/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ mod tests {

#[test]
#[should_panic(expected = "memory is not aligned")]
#[allow(deprecated)]
fn test_primitive_array_alignment() {
let ptr = arrow_buffer::alloc::allocate_aligned(8);
let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
Expand All @@ -845,6 +846,7 @@ mod tests {
// Different error messages, so skip for now
// https://github.com/apache/arrow-rs/issues/1545
#[cfg(not(feature = "force_validate"))]
#[allow(deprecated)]
fn test_list_array_alignment() {
let ptr = arrow_buffer::alloc::allocate_aligned(8);
let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
Expand Down
14 changes: 9 additions & 5 deletions arrow-buffer/src/alloc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ fn dangling_ptr() -> NonNull<u8> {
/// Allocates a cache-aligned memory region of `size` bytes with uninitialized values.
/// This is more performant than using [allocate_aligned_zeroed] when all bytes will have
/// an unknown or non-zero value and is semantically similar to `malloc`.
#[deprecated(note = "Use Vec")]
pub fn allocate_aligned(size: usize) -> NonNull<u8> {
unsafe {
if size == 0 {
Expand All @@ -60,6 +61,7 @@ pub fn allocate_aligned(size: usize) -> NonNull<u8> {
/// Allocates a cache-aligned memory region of `size` bytes with `0` on all of them.
/// This is more performant than using [allocate_aligned] and setting all bytes to zero
/// and is semantically similar to `calloc`.
#[deprecated(note = "Use Vec")]
pub fn allocate_aligned_zeroed(size: usize) -> NonNull<u8> {
unsafe {
if size == 0 {
Expand All @@ -80,6 +82,7 @@ pub fn allocate_aligned_zeroed(size: usize) -> NonNull<u8> {
/// * ptr must denote a block of memory currently allocated via this allocator,
///
/// * size must be the same size that was used to allocate that block of memory,
#[deprecated(note = "Use Vec")]
pub unsafe fn free_aligned(ptr: NonNull<u8>, size: usize) {
if size != 0 {
std::alloc::dealloc(
Expand All @@ -100,6 +103,8 @@ pub unsafe fn free_aligned(ptr: NonNull<u8>, size: usize) {
///
/// * new_size, when rounded up to the nearest multiple of [ALIGNMENT], must not overflow (i.e.,
/// the rounded value must be less than usize::MAX).
#[deprecated(note = "Use Vec")]
#[allow(deprecated)]
pub unsafe fn reallocate(
ptr: NonNull<u8>,
old_size: usize,
Expand Down Expand Up @@ -132,9 +137,8 @@ impl<T: RefUnwindSafe + Send + Sync> Allocation for T {}

/// Mode of deallocating memory regions
pub(crate) enum Deallocation {
/// An allocation of the given capacity that needs to be deallocated using arrows's cache aligned allocator.
/// See [allocate_aligned] and [free_aligned].
Arrow(usize),
/// An allocation using [`std::alloc`]
Standard(Layout),
/// An allocation from an external source like the FFI interface or a Rust Vec.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the comment for Custom should be updated to say like "external Rust Vec" or maybe remove the reference to Rust Vec entirely

/// Deallocation will happen
Custom(Arc<dyn Allocation>),
Expand All @@ -143,8 +147,8 @@ pub(crate) enum Deallocation {
impl Debug for Deallocation {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Deallocation::Arrow(capacity) => {
write!(f, "Deallocation::Arrow {{ capacity: {capacity} }}")
Deallocation::Standard(layout) => {
write!(f, "Deallocation::Standard {layout:?}")
}
Deallocation::Custom(_) => {
write!(f, "Deallocation::Custom {{ capacity: unknown }}")
Expand Down
147 changes: 142 additions & 5 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::convert::AsRef;
use std::alloc::Layout;
use std::fmt::Debug;
use std::iter::FromIterator;
use std::ptr::NonNull;
use std::sync::Arc;

use crate::alloc::{Allocation, Deallocation};
use crate::alloc::{Allocation, Deallocation, ALIGNMENT};
use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
use crate::{bytes::Bytes, native::ArrowNativeType};

Expand All @@ -42,6 +42,8 @@ pub struct Buffer {
ptr: *const u8,

/// Byte length of the buffer.
///
/// Must be less than or equal to `data.len()`
length: usize,
}

Expand Down Expand Up @@ -69,6 +71,21 @@ impl Buffer {
}
}

/// Create a [`Buffer`] from the provided `Vec` without copying
#[inline]
pub fn from_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't implement From as this would conflict with the blanket From<AsRef<[u8]>>

// Safety
// Vec::as_ptr guaranteed to not be null and ArrowNativeType are trivially transmutable
let ptr = unsafe { NonNull::new_unchecked(vec.as_ptr() as _) };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like into_raw_parts would be ideal here, but sadly it appears to not yet be stabalized

https://doc.rust-lang.org/beta/std/vec/struct.Vec.html#method.into_raw_parts

I note that the docs say

After calling this function, the caller is responsible for the memory previously managed by the Vec. The only way to do this is to convert the raw pointer, length, and capacity back into a Vec with the from_raw_parts function, allowing the destructor to perform the cleanup.

However, this code uses Deallocation::Standard(layout) to deallocate memory, which seems ok, I just wanted to point out it doesn't match the docs (though maybe this is fine)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs for dealloc are authoritative here

ptr must denote a block of memory currently allocated via this allocator
layout must be the same layout that was used to allocate that block of memory

The layout of arrays, vecs, etc... is fixed and defined by Layout::array. As such we are fine provided we obey the same logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to store/create from layouts other than Vec? Otherwise we can just create from Vec, forget the owned Vector.

On drop we recreate the Vec so that the drop sequence is executed.

This would defer all this logic to the implementation in std.

Copy link
Contributor Author

@tustvold tustvold Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to store/create from layouts other than Vec? Otherwise we can just create from Vec, forget the owned Vector.

Eventually we may deprecate and remove support for other layouts, but at least for a period we need to support aligned layouts such as those created by MutableBuffer so that we can avoid stop-the-world changes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, maybe we could add a comment that once we remove that support we could remove the unsafe layout related code and do something similar to what we do here: https://github.com/DataEngineeringLabs/foreign_vec/blob/0d38968facee8a81748ec380fad78379d806fe1d/src/lib.rs#L25

Copy link
Contributor Author

@tustvold tustvold Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe layout related code and do something similar to what we do here

What would be the advantage of this? Does polars depend on the foreign vec interface? It doesn't seem to be doing anything materially different?

Edit: In fact I'm fairly sure this line is technically unsound - https://github.com/DataEngineeringLabs/foreign_vec/blob/0d38968facee8a81748ec380fad78379d806fe1d/src/lib.rs#L49, there are a lot of safety constraints on from_raw_parts that appear to be not being checked or documented?

let len = vec.len() * std::mem::size_of::<T>();
// Safety
// Layout guaranteed to be valid
let layout = unsafe { Layout::array::<T>(vec.capacity()).unwrap_unchecked() };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to use unwrap_unchecked here? Maybe it would be better to panic if the layout was messed up somehow?

Looks like it errors on overflow https://doc.rust-lang.org/beta/std/alloc/struct.Layout.html#method.array

Like maybe the size overflows because someone passed in a giant Vec or something?

🤔

Copy link
Contributor Author

@tustvold tustvold Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to use unwrap_unchecked here

This is a clone of Vec::current_memory, I will add a link.

The TLDR is that if the Vec is valid, it must have a valid layout (otherwise among other issues Vec wouldn't be able to deallocate itself).

std::mem::forget(vec);
let b = unsafe { Bytes::new(ptr, len, Deallocation::Standard(layout)) };
Self::from_bytes(b)
}

/// Initializes a [Buffer] from a slice of items.
pub fn from_slice_ref<U: ArrowNativeType, T: AsRef<[U]>>(items: T) -> Self {
let slice = items.as_ref();
Expand All @@ -78,7 +95,7 @@ impl Buffer {
buffer.into()
}

/// Creates a buffer from an existing memory region (must already be byte-aligned), this
/// Creates a buffer from an existing aligned memory region (must already be byte-aligned), this
/// `Buffer` will free this piece of memory when dropped.
///
/// # Arguments
Expand All @@ -91,9 +108,11 @@ impl Buffer {
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
#[deprecated(note = "Use From<Vec<T>>")]
pub unsafe fn from_raw_parts(ptr: NonNull<u8>, len: usize, capacity: usize) -> Self {
assert!(len <= capacity);
Buffer::build_with_arguments(ptr, len, Deallocation::Arrow(capacity))
let layout = Layout::from_size_align(capacity, ALIGNMENT).unwrap();
Buffer::build_with_arguments(ptr, len, Deallocation::Standard(layout))
}

/// Creates a buffer from an existing memory region. Ownership of the memory is tracked via reference counting
Expand Down Expand Up @@ -253,7 +272,8 @@ impl Buffer {
}

/// Returns `MutableBuffer` for mutating the buffer if this buffer is not shared.
/// Returns `Err` if this is shared or its allocation is from an external source.
/// Returns `Err` if this is shared or its allocation is from an external source or
/// it is not allocated with alignment [`ALIGNMENT`]
pub fn into_mutable(self) -> Result<MutableBuffer, Self> {
let ptr = self.ptr;
let length = self.length;
Expand All @@ -269,6 +289,43 @@ impl Buffer {
length,
})
}

/// Returns `Vec` for mutating the buffer if this buffer is not offset and was
/// allocated with the correct layout for `Vec<T>`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to explicitly say here an error is returned if the buffer can't be converted to a Vec (and ideally hint how to get a Vec out of it anyways (perhaps by copying)

pub fn into_vec<T: ArrowNativeType>(self) -> Result<Vec<T>, Self> {
let layout = match self.data.deallocation() {
Deallocation::Standard(l) => l,
_ => return Err(self), // Custom allocation
};

if self.ptr != self.data.as_ptr() {
return Err(self); // Data is offset
}

let v_capacity = layout.size() / std::mem::size_of::<T>();
match Layout::array::<T>(v_capacity) {
Ok(expected) if layout == &expected => {}
_ => return Err(self), // Incorrect layout
}

let length = self.length;
let ptr = self.ptr;
let v_len = self.length / std::mem::size_of::<T>();

Arc::try_unwrap(self.data)
.map(|bytes| unsafe {
let ptr = bytes.ptr().as_ptr() as _;
std::mem::forget(bytes);
// Safety
// Verified that bytes layout matches that of Vec
Vec::from_raw_parts(ptr, v_len, v_capacity)
})
.map_err(|bytes| Buffer {
data: bytes,
ptr,
length,
})
}
}

/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
Expand Down Expand Up @@ -378,6 +435,7 @@ impl<T: ArrowNativeType> FromIterator<T> for Buffer {

#[cfg(test)]
mod tests {
use crate::i256;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::thread;

Expand Down Expand Up @@ -632,4 +690,83 @@ mod tests {
let buffer = Buffer::from(MutableBuffer::from_len_zeroed(12));
buffer.slice_with_length(2, usize::MAX);
}

#[test]
fn test_vec_interop() {
Copy link
Contributor Author

@tustvold tustvold Feb 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding Vec conversion at the Buffer level does allow for transmuting types provided they have the same layout, this is perhaps not ideal but isn't harmful.

Long-term I hope to deprecate and remove Buffer and MutableBuffer, just leaving ScalarBuffer which statically prevents this sort of type-conversion, but doing it this way allows for a gradual migration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is perhaps not ideal but isn't harmful.

Is it a problem for something like converting integers to (invalid) floats 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, there is a good justification for why this is perfectly fine here. The only place where this becomes problematic is where types have bit sequences that illegal, e.g. NonZeroU32 or bool, all ArrowNativeType are valid for all bit sequences.

// Test empty vec
let a: Vec<i128> = Vec::new();
let b = Buffer::from_vec(a);
b.into_vec::<i128>().unwrap();

// Test vec with capacity
let a: Vec<i128> = Vec::with_capacity(20);
let b = Buffer::from_vec(a);
let back = b.into_vec::<i128>().unwrap();
assert_eq!(back.len(), 0);
assert_eq!(back.capacity(), 20);

// Test vec with values
let mut a: Vec<i128> = Vec::with_capacity(3);
a.extend_from_slice(&[1, 2, 3]);
let b = Buffer::from_vec(a);
let back = b.into_vec::<i128>().unwrap();
assert_eq!(back.len(), 3);
assert_eq!(back.capacity(), 3);

// Test vec with values and spare capacity
let mut a: Vec<i128> = Vec::with_capacity(20);
a.extend_from_slice(&[1, 4, 7, 8, 9, 3, 6]);
let b = Buffer::from_vec(a);
let back = b.into_vec::<i128>().unwrap();
assert_eq!(back.len(), 7);
assert_eq!(back.capacity(), 20);

// Test incorrect alignment
let a: Vec<i128> = Vec::new();
let b = Buffer::from_vec(a);
let b = b.into_vec::<i32>().unwrap_err();
b.into_vec::<i8>().unwrap_err();

// Test convert between types with same alignment
// This is an implementation quirk, but isn't harmful
// as ArrowNativeType are trivially transmutable
let a: Vec<i64> = vec![1, 2, 3, 4];
let b = Buffer::from_vec(a);
let back = b.into_vec::<u64>().unwrap();
assert_eq!(back.len(), 4);
assert_eq!(back.capacity(), 4);

// i256 has the same layout as i128 so this is valid
let mut b: Vec<i128> = Vec::with_capacity(4);
b.extend_from_slice(&[1, 2, 3, 4]);
let b = Buffer::from_vec(b);
let back = b.into_vec::<i256>().unwrap();
assert_eq!(back.len(), 2);
assert_eq!(back.capacity(), 2);

// Invalid layout
let b: Vec<i128> = vec![1, 2, 3];
let b = Buffer::from_vec(b);
b.into_vec::<i256>().unwrap_err();

// Invalid layout
let mut b: Vec<i128> = Vec::with_capacity(5);
b.extend_from_slice(&[1, 2, 3, 4]);
let b = Buffer::from_vec(b);
b.into_vec::<i256>().unwrap_err();

// Truncates length
// This is an implementation quirk, but isn't harmful
let mut b: Vec<i128> = Vec::with_capacity(4);
b.extend_from_slice(&[1, 2, 3]);
let b = Buffer::from_vec(b);
let back = b.into_vec::<i256>().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is also worth a test when the capacity doesn't equally divide equally into the transmuted size -- like maybe change this test to have capacity 5 with i128 and verify the i256 output Vec only has capacity 2

Copy link
Contributor Author

@tustvold tustvold Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A capacity of 5 is tested above and results in an error, as the layout of the underlying allocation is invalid

assert_eq!(back.len(), 1);
assert_eq!(back.capacity(), 2);

// Cannot use aligned allocation
let b = Buffer::from(MutableBuffer::new(10));
let b = b.into_vec::<u8>().unwrap_err();
b.into_vec::<u64>().unwrap_err();
}
}
30 changes: 21 additions & 9 deletions arrow-buffer/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,28 @@
// under the License.

use super::Buffer;
use crate::alloc::Deallocation;
use crate::alloc::{Deallocation, ALIGNMENT};
use crate::{
alloc,
bytes::Bytes,
native::{ArrowNativeType, ToByteSlice},
util::bit_util,
};
use std::alloc::Layout;
use std::mem;
use std::ptr::NonNull;

/// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items.
///
/// [`Buffer`]s created from [`MutableBuffer`] (via `into`) are guaranteed to have its pointer aligned
/// along cache lines and in multiple of 64 bytes.
///
/// Use [MutableBuffer::push] to insert an item, [MutableBuffer::extend_from_slice]
/// to insert many items, and `into` to convert it to [`Buffer`].
///
/// For a safe, strongly typed API consider using `arrow::array::BufferBuilder`
/// For a safe, strongly typed API consider using `Vec`
///
/// Note: this may be deprecated in a future release (#1176)[https://github.com/apache/arrow-rs/issues/1176]
///
/// # Example
///
Expand Down Expand Up @@ -62,6 +67,7 @@ impl MutableBuffer {

/// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`.
#[inline]
#[allow(deprecated)]
pub fn with_capacity(capacity: usize) -> Self {
let capacity = bit_util::round_upto_multiple_of_64(capacity);
let ptr = alloc::allocate_aligned(capacity);
Expand All @@ -83,6 +89,7 @@ impl MutableBuffer {
/// let data = buffer.as_slice_mut();
/// assert_eq!(data[126], 0u8);
/// ```
#[allow(deprecated)]
pub fn from_len_zeroed(len: usize) -> Self {
let new_capacity = bit_util::round_upto_multiple_of_64(len);
let ptr = alloc::allocate_aligned_zeroed(new_capacity);
Expand All @@ -95,12 +102,14 @@ impl MutableBuffer {

/// Allocates a new [MutableBuffer] from given `Bytes`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it is worth updating these docs to explain when an Err is returned

pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
if !matches!(bytes.deallocation(), Deallocation::Arrow(_)) {
return Err(bytes);
}
let capacity = match bytes.deallocation() {
Deallocation::Standard(layout) if layout.align() == ALIGNMENT => {
layout.size()
}
_ => return Err(bytes),
};

let len = bytes.len();
let capacity = bytes.capacity();
let ptr = bytes.ptr();
mem::forget(bytes);

Expand Down Expand Up @@ -224,6 +233,7 @@ impl MutableBuffer {
/// buffer.shrink_to_fit();
/// assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
/// ```
#[allow(deprecated)]
pub fn shrink_to_fit(&mut self) {
let new_capacity = bit_util::round_upto_multiple_of_64(self.len);
if new_capacity < self.capacity {
Expand Down Expand Up @@ -300,9 +310,9 @@ impl MutableBuffer {

#[inline]
pub(super) fn into_buffer(self) -> Buffer {
let bytes = unsafe {
Bytes::new(self.data, self.len, Deallocation::Arrow(self.capacity))
};
let layout = Layout::from_size_align(self.capacity, ALIGNMENT).unwrap();
let bytes =
unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(layout)) };
std::mem::forget(self);
Buffer::from_bytes(bytes)
}
Expand Down Expand Up @@ -448,6 +458,7 @@ impl MutableBuffer {
/// # Safety
/// `ptr` must be allocated for `old_capacity`.
#[cold]
#[allow(deprecated)]
unsafe fn reallocate(
ptr: NonNull<u8>,
old_capacity: usize,
Expand Down Expand Up @@ -630,6 +641,7 @@ impl std::ops::DerefMut for MutableBuffer {
}

impl Drop for MutableBuffer {
#[allow(deprecated)]
fn drop(&mut self) {
unsafe { alloc::free_aligned(self.data, self.capacity) };
}
Expand Down
Loading