diff --git a/Cargo.lock b/Cargo.lock index f8cdc9ca06e4..0f71b3d7f619 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -757,7 +757,7 @@ dependencies = [ "futures-lite 2.3.0", "parking", "polling 3.7.3", - "rustix 0.38.34", + "rustix 0.38.37", "slab", "tracing", "windows-sys 0.59.0", @@ -1951,7 +1951,7 @@ dependencies = [ "io-lifetimes 2.0.3", "ipnet", "maybe-owned", - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.52.0", "winx", ] @@ -1975,7 +1975,7 @@ dependencies = [ "cap-primitives", "io-extras", "io-lifetimes 2.0.3", - "rustix 0.38.34", + "rustix 0.38.37", ] [[package]] @@ -1988,7 +1988,7 @@ dependencies = [ "cap-primitives", "iana-time-zone", "once_cell", - "rustix 0.38.34", + "rustix 0.38.37", "winx", ] @@ -3265,6 +3265,7 @@ dependencies = [ "rand 0.8.5", "regex", "replace_with", + "rustix 0.38.37", "semver", "serde", "serde_json", @@ -5386,6 +5387,7 @@ dependencies = [ "log", "parking_lot 0.12.3", "rayon", + "rustix 0.38.37", "siphasher", "tempfile", ] @@ -6408,7 +6410,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e5768da2206272c81ef0b5e951a41862938a6070da63bcea197899942d3b947" dependencies = [ "cfg-if", - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.52.0", ] @@ -6689,7 +6691,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "033b337d725b97690d86893f9de22b67b80dcc4e9ad815f348254c38119db8fb" dependencies = [ "io-lifetimes 2.0.3", - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.52.0", ] @@ -6709,7 +6711,7 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7e180ac76c23b45e767bd7ae9579bc0bb458618c4bc71835926e098e61d15f8" dependencies = [ - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.52.0", ] @@ -7545,7 +7547,7 @@ dependencies = [ "itoa", "libc", "memmap2 0.9.4", - "rustix 0.38.34", + "rustix 0.38.37", "smallvec", "thiserror", ] @@ -7707,7 +7709,7 @@ dependencies = [ "gix-command", "gix-config-value", "parking_lot 0.12.3", - "rustix 0.38.34", + "rustix 0.38.37", "thiserror", ] @@ -9518,9 +9520,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.155" +version = "0.2.158" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" [[package]] name = "libflate" @@ -9933,7 +9935,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2cffa4ad52c6f791f4f8b15f0c05f9824b2ced1160e88cc393d64fff9a8ac64" dependencies = [ - "rustix 0.38.34", + "rustix 0.38.37", ] [[package]] @@ -11660,7 +11662,7 @@ dependencies = [ "concurrent-queue", "hermit-abi 0.4.0", "pin-project-lite", - "rustix 0.38.34", + "rustix 0.38.37", "tracing", "windows-sys 0.59.0", ] @@ -11911,7 +11913,7 @@ dependencies = [ "hex", "lazy_static", "procfs-core", - "rustix 0.38.34", + "rustix 0.38.37", ] [[package]] @@ -13232,9 +13234,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.34" +version = "0.38.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +checksum = "8acb788b847c24f28525660c4d7758620a7210875711f79e7f663cc152726811" dependencies = [ "bitflags 2.6.0", "errno", @@ -14755,7 +14757,7 @@ dependencies = [ "cap-std", "fd-lock", "io-lifetimes 2.0.3", - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.52.0", "winx", ] @@ -14945,7 +14947,7 @@ dependencies = [ "cfg-if", "fastrand 2.1.0", "once_cell", - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.59.0", ] @@ -14974,7 +14976,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21bebf2b7c9e0a515f6e0f8c51dc0f8e4696391e6f1ff30379559f8365fb0df7" dependencies = [ - "rustix 0.38.34", + "rustix 0.38.37", "windows-sys 0.48.0", ] @@ -16069,7 +16071,7 @@ dependencies = [ "io-lifetimes 2.0.3", "log", "once_cell", - "rustix 0.38.34", + "rustix 0.38.37", "system-interface", "thiserror", "tracing", @@ -16235,7 +16237,7 @@ dependencies = [ "once_cell", "paste", "rayon", - "rustix 0.38.34", + "rustix 0.38.37", "semver", "serde", "serde_derive", @@ -16278,7 +16280,7 @@ dependencies = [ "bincode 1.3.3", "directories-next", "log", - "rustix 0.38.34", + "rustix 0.38.37", "serde", "serde_derive", "sha2", @@ -16367,7 +16369,7 @@ dependencies = [ "anyhow", "cc", "cfg-if", - "rustix 0.38.34", + "rustix 0.38.37", "wasmtime-asm-macros", "wasmtime-versioned-export-macros", "windows-sys 0.52.0", @@ -16381,7 +16383,7 @@ checksum = "983ca409f2cd66385ce49486c022da0128acb7910c055beb5230998b49c6084c" dependencies = [ "object 0.33.0", "once_cell", - "rustix 0.38.34", + "rustix 0.38.37", "wasmtime-versioned-export-macros", ] @@ -16414,7 +16416,7 @@ dependencies = [ "memoffset", "paste", "psm", - "rustix 0.38.34", + "rustix 0.38.37", "sptr", "wasm-encoder 0.202.0", "wasmtime-asm-macros", diff --git a/Cargo.toml b/Cargo.toml index 2e207de5095e..1c07a86138c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -247,6 +247,7 @@ http = "1" itertools = "0.10.5" jsonb = "0.4.1" jwt-simple = "0.11.0" +libc = { version = "0.2.158" } match-template = "0.0.1" mysql_async = { version = "0.34", default-features = false, features = ["native-tls-tls"] } object_store_opendal = "0.46" diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index 62a88b3efce1..ab434f8558db 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -31,7 +31,7 @@ databend-common-exception = { workspace = true } enquote = "1.1.0" fastrace = { workspace = true } futures = { workspace = true } -libc = "0.2.153" +libc = { workspace = true } log = { workspace = true } logcall = { workspace = true } micromarshal = "0.5.0" @@ -50,6 +50,7 @@ prometheus-parse = "0.2.3" rand = { workspace = true, features = ["serde1"] } regex = { workspace = true } replace_with = "0.1.7" +rustix = "0.38.37" semver = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/src/common/base/src/base/dma.rs b/src/common/base/src/base/dma.rs new file mode 100644 index 000000000000..063b9d5467fd --- /dev/null +++ b/src/common/base/src/base/dma.rs @@ -0,0 +1,495 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::alloc::AllocError; +use std::alloc::Allocator; +use std::alloc::Global; +use std::alloc::Layout; +use std::io; +use std::io::IoSlice; +use std::io::SeekFrom; +use std::ops::Range; +use std::os::fd::BorrowedFd; +use std::os::unix::io::AsRawFd; +use std::path::Path; +use std::ptr::Alignment; +use std::ptr::NonNull; + +use rustix::fs::OFlags; +use tokio::fs::File; +use tokio::io::AsyncSeekExt; + +use crate::runtime::spawn_blocking; + +unsafe impl Send for DmaAllocator {} + +pub struct DmaAllocator(Alignment); + +impl DmaAllocator { + pub fn new(align: Alignment) -> DmaAllocator { + DmaAllocator(align) + } + + fn real_layout(&self, layout: Layout) -> Layout { + Layout::from_size_align(layout.size(), self.0.as_usize()).unwrap() + } + + fn real_cap(&self, cap: usize) -> usize { + align_up(self.0, cap) + } +} + +unsafe impl Allocator for DmaAllocator { + fn allocate(&self, layout: Layout) -> Result, AllocError> { + Global {}.allocate(self.real_layout(layout)) + } + + fn allocate_zeroed(&self, layout: Layout) -> Result, AllocError> { + Global {}.allocate_zeroed(self.real_layout(layout)) + } + + unsafe fn grow( + &self, + ptr: NonNull, + old_layout: Layout, + new_layout: Layout, + ) -> Result, AllocError> { + Global {}.grow( + ptr, + self.real_layout(old_layout), + self.real_layout(new_layout), + ) + } + + unsafe fn grow_zeroed( + &self, + ptr: NonNull, + old_layout: Layout, + new_layout: Layout, + ) -> Result, AllocError> { + Global {}.grow_zeroed( + ptr, + self.real_layout(old_layout), + self.real_layout(new_layout), + ) + } + + unsafe fn deallocate(&self, ptr: std::ptr::NonNull, layout: Layout) { + Global {}.deallocate(ptr, self.real_layout(layout)) + } +} + +type DmaBuffer = Vec; + +pub fn dma_buffer_as_vec(mut buf: DmaBuffer) -> Vec { + let ptr = buf.as_mut_ptr(); + let len = buf.len(); + let cap = buf.allocator().real_cap(buf.capacity()); + std::mem::forget(buf); + + unsafe { Vec::from_raw_parts(ptr, len, cap) } +} + +/// A `DmaFile` is similar to a `File`, but it is opened with the `O_DIRECT` file in order to +/// perform direct IO. +struct DmaFile { + fd: File, + alignment: Alignment, + buf: Option, +} + +impl DmaFile { + /// Attempts to open a file in read-only mode. + async fn open(path: impl AsRef) -> io::Result { + let file = File::options() + .read(true) + .custom_flags(OFlags::DIRECT.bits() as i32) + .open(path) + .await?; + + open_dma(file).await + } + + /// Opens a file in write-only mode. + async fn create(path: impl AsRef) -> io::Result { + let file = File::options() + .write(true) + .create(true) + .truncate(true) + .custom_flags((OFlags::DIRECT | OFlags::EXCL).bits() as i32) + .open(path) + .await?; + + open_dma(file).await + } + + fn set_buffer(&mut self, buf: DmaBuffer) { + self.buf = Some(buf) + } + + /// Aligns `value` up to the memory alignment requirement for this file. + pub fn align_up(&self, value: usize) -> usize { + align_up(self.alignment, value) + } + + /// Aligns `value` down to the memory alignment requirement for this file. + pub fn align_down(&self, value: usize) -> usize { + align_down(self.alignment, value) + } + + /// Return the alignment requirement for this file. The returned alignment value can be used + /// to allocate a buffer to use with this file: + #[expect(dead_code)] + pub fn alignment(&self) -> Alignment { + self.alignment + } + + fn buffer(&self) -> &DmaBuffer { + self.buf.as_ref().unwrap() + } + + fn mut_buffer(&mut self) -> &mut DmaBuffer { + self.buf.as_mut().unwrap() + } + + fn write_direct(&mut self) -> io::Result { + let buf = self.buffer(); + match rustix::io::write(&self.fd, buf) { + Ok(n) => { + if n != buf.len() { + return Err(io::Error::new(io::ErrorKind::Other, "short write")); + } + self.mut_buffer().clear(); + Ok(n) + } + Err(e) => Err(e.into()), + } + } + + fn read_direct(&mut self, n: usize) -> io::Result { + let Self { fd, buf, .. } = self; + let buf = buf.as_mut().unwrap(); + if n > buf.capacity() - buf.len() { + return Err(io::Error::new(io::ErrorKind::Other, "buf not sufficient")); + } + let start = buf.len(); + unsafe { buf.set_len(buf.len() + n) }; + match rustix::io::read(fd, &mut (*buf)[start..]) { + Ok(n) => { + buf.truncate(start + n); + Ok(n) + } + Err(e) => Err(e.into()), + } + } + + fn truncate(&self, length: usize) -> io::Result<()> { + rustix::fs::ftruncate(&self.fd, length as u64).map_err(|e| e.into()) + } + + async fn seek(&mut self, pos: SeekFrom) -> io::Result { + self.fd.seek(pos).await + } +} + +pub fn align_up(alignment: Alignment, value: usize) -> usize { + (value + alignment.as_usize() - 1) & alignment.mask() +} + +pub fn align_down(alignment: Alignment, value: usize) -> usize { + value & alignment.mask() +} + +async fn open_dma(file: File) -> io::Result { + let stat = fstatvfs(&file).await?; + let alignment = Alignment::new(stat.f_bsize.max(512) as usize).unwrap(); + + Ok(DmaFile { + fd: file, + alignment, + buf: None, + }) +} + +async fn fstatvfs(file: &File) -> io::Result { + let fd = file.as_raw_fd(); + asyncify(move || { + let fd = unsafe { BorrowedFd::borrow_raw(fd) }; + rustix::fs::fstatvfs(fd).map_err(|e| e.into()) + }) + .await +} + +async fn asyncify(f: F) -> io::Result +where + F: FnOnce() -> io::Result + Send + 'static, + T: Send + 'static, +{ + match spawn_blocking(f).await { + Ok(res) => res, + Err(_) => Err(io::Error::new( + io::ErrorKind::Other, + "background task failed", + )), + } +} + +pub async fn dma_write_file_vectored<'a>( + path: impl AsRef, + bufs: &'a [IoSlice<'a>], +) -> io::Result { + let mut file = DmaFile::create(path.as_ref()).await?; + + let file_length = bufs.iter().map(|buf| buf.len()).sum(); + if file_length == 0 { + return Ok(0); + } + + const BUFFER_SIZE: usize = 1024 * 1024; + let buffer_size = BUFFER_SIZE.min(file_length); + + let dma_buf = Vec::with_capacity_in( + file.align_up(buffer_size), + DmaAllocator::new(file.alignment), + ); + file.set_buffer(dma_buf); + + for src in bufs { + let mut src = &src[..]; + + while !src.is_empty() { + let dst = file.buffer(); + if dst.capacity() == dst.len() { + file = asyncify(move || file.write_direct().map(|_| file)).await?; + } + + let dst = file.mut_buffer(); + let remaining = dst.capacity() - dst.len(); + let n = src.len().min(remaining); + let (left, right) = src.split_at(n); + dst.extend_from_slice(left); + src = right; + } + } + + let len = file.buffer().len(); + if len > 0 { + let align_up = file.align_up(len); + if align_up == len { + asyncify(move || file.write_direct()).await?; + } else { + let dst = file.mut_buffer(); + unsafe { dst.set_len(align_up) } + asyncify(move || { + file.write_direct()?; + file.truncate(file_length) + }) + .await?; + } + } + + Ok(file_length) +} + +pub async fn dma_read_file( + path: impl AsRef, + mut writer: impl io::Write, +) -> io::Result { + const BUFFER_SIZE: usize = 1024 * 1024; + let mut file = DmaFile::open(path.as_ref()).await?; + let buf = Vec::with_capacity_in( + file.align_up(BUFFER_SIZE), + DmaAllocator::new(file.alignment), + ); + file.set_buffer(buf); + + let mut n = 0; + loop { + file = asyncify(move || { + let buf = file.buffer(); + let remain = buf.capacity() - buf.len(); + file.read_direct(remain).map(|_| file) + }) + .await?; + + let buf = file.buffer(); + if buf.is_empty() { + return Ok(n); + } + n += buf.len(); + writer.write_all(buf)?; + // WARN: Is it possible to have a short read but not eof? + let eof = buf.capacity() > buf.len(); + unsafe { file.mut_buffer().set_len(0) } + if eof { + return Ok(n); + } + } +} + +pub async fn dma_read_file_range( + path: impl AsRef, + range: Range, +) -> io::Result<(DmaBuffer, Range)> { + let mut file = DmaFile::open(path.as_ref()).await?; + + let align_start = file.align_down(range.start as usize); + let align_end = file.align_up(range.end as usize); + + let buf = Vec::with_capacity_in(align_end - align_start, DmaAllocator::new(file.alignment)); + file.set_buffer(buf); + + if align_start != 0 { + let offset = file.seek(SeekFrom::Start(align_start as u64)).await?; + if offset as usize != align_start { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "range out of range", + )); + } + } + + let mut n; + loop { + (file, n) = asyncify(move || { + let buf = file.buffer(); + let remain = buf.capacity() - buf.len(); + file.read_direct(remain).map(|n| (file, n)) + }) + .await?; + if align_start + file.buffer().len() >= range.end as usize { + break; + } + if n == 0 { + return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "")); + } + } + + let rt_range = range.start as usize - align_start..range.end as usize - align_start; + Ok((file.buf.unwrap(), rt_range)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_read_write() { + let _ = std::fs::remove_file("test_file"); + + run_test(0).await.unwrap(); + run_test(100).await.unwrap(); + run_test(200).await.unwrap(); + + run_test(4096 - 1).await.unwrap(); + run_test(4096).await.unwrap(); + run_test(4096 + 1).await.unwrap(); + + run_test(4096 * 2 - 1).await.unwrap(); + run_test(4096 * 2).await.unwrap(); + run_test(4096 * 2 + 1).await.unwrap(); + + run_test(1024 * 1024 * 3 - 1).await.unwrap(); + run_test(1024 * 1024 * 3).await.unwrap(); + run_test(1024 * 1024 * 3 + 1).await.unwrap(); + } + + async fn run_test(n: usize) -> io::Result<()> { + let filename = "test_file"; + let want = (0..n).map(|i| (i % 256) as u8).collect::>(); + + let bufs = vec![IoSlice::new(&want)]; + let length = dma_write_file_vectored(filename, &bufs).await?; + + assert_eq!(length, want.len()); + + let mut got = Vec::new(); + + let length = dma_read_file(filename, &mut got).await?; + assert_eq!(length, want.len()); + assert_eq!(got, want); + + let (buf, range) = dma_read_file_range(filename, 0..length as u64).await?; + assert_eq!(&buf[range], &want); + + std::fs::remove_file(filename)?; + Ok(()) + } + + #[tokio::test] + async fn test_range_read() { + let filename = "test_file2"; + let _ = std::fs::remove_file(filename); + let n: usize = 4096 * 2; + + let want = (0..n).map(|i| (i % 256) as u8).collect::>(); + + let bufs = vec![IoSlice::new(&want)]; + dma_write_file_vectored(filename, &bufs).await.unwrap(); + + let got = dma_read_file_range(filename, 0..10).await.unwrap(); + let got = got.0[got.1].to_vec(); + assert_eq!(&want[0..10], got); + + let got = dma_read_file_range(filename, 10..30).await.unwrap(); + let got = got.0[got.1].to_vec(); + assert_eq!(&want[10..30], got); + + let got = dma_read_file_range(filename, 4096 - 5..4096 + 5) + .await + .unwrap(); + let got = got.0[got.1].to_vec(); + assert_eq!(&want[4096 - 5..4096 + 5], got); + + let got = dma_read_file_range(filename, 4096..4096 + 5).await.unwrap(); + let got = got.0[got.1].to_vec(); + assert_eq!(&want[4096..4096 + 5], got); + + let got = dma_read_file_range(filename, 4096 * 2 - 5..4096 * 2) + .await + .unwrap(); + let got = got.0[got.1].to_vec(); + assert_eq!(&want[4096 * 2 - 5..4096 * 2], got); + + let _ = std::fs::remove_file(filename); + } + + #[tokio::test] + async fn test_read_direct() { + let filename = "test_file3"; + let _ = std::fs::remove_file(filename); + let stat = rustix::fs::statvfs(".").unwrap(); + let alignment = 512.max(stat.f_bsize as usize); + let file_size: usize = alignment * 2; + + let want = (0..file_size).map(|i| (i % 256) as u8).collect::>(); + + let bufs = vec![IoSlice::new(&want)]; + dma_write_file_vectored(filename, &bufs).await.unwrap(); + + let mut file = DmaFile::open(filename).await.unwrap(); + let buf = Vec::with_capacity_in(file_size, DmaAllocator::new(file.alignment)); + file.set_buffer(buf); + + let got = file.read_direct(alignment).unwrap(); + assert_eq!(alignment, got); + assert_eq!(&want[0..alignment], &**file.buffer()); + + let got = file.read_direct(alignment).unwrap(); + assert_eq!(alignment, got); + assert_eq!(&want, &**file.buffer()); + + let _ = std::fs::remove_file(filename); + } +} diff --git a/src/common/base/src/base/mod.rs b/src/common/base/src/base/mod.rs index 175730845145..5ac11ea7a46c 100644 --- a/src/common/base/src/base/mod.rs +++ b/src/common/base/src/base/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod dma; mod net; mod ordered_float; mod profiling; @@ -27,6 +28,10 @@ mod take_mut; mod uniq_id; mod watch_notify; +pub use dma::dma_buffer_as_vec; +pub use dma::dma_read_file; +pub use dma::dma_read_file_range; +pub use dma::dma_write_file_vectored; pub use net::get_free_tcp_port; pub use net::get_free_udp_port; pub use ordered_float::OrderedFloat; diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index 04ae30bd8bb0..31060daffe9e 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -24,6 +24,7 @@ #![feature(alloc_error_hook)] #![feature(slice_swap_unchecked)] #![feature(variant_count)] +#![feature(ptr_alignment_type)] pub mod base; pub mod containers; diff --git a/src/common/base/src/runtime/profile/profiles.rs b/src/common/base/src/runtime/profile/profiles.rs index 3f9a4a40d795..6f75ef5d0afc 100644 --- a/src/common/base/src/runtime/profile/profiles.rs +++ b/src/common/base/src/runtime/profile/profiles.rs @@ -37,12 +37,23 @@ pub enum ProfileStatisticsName { ScanBytes, ScanCacheBytes, ScanPartitions, - SpillWriteCount, - SpillWriteBytes, - SpillWriteTime, - SpillReadCount, - SpillReadBytes, - SpillReadTime, + + RemoteSpillWriteCount, + RemoteSpillWriteBytes, + RemoteSpillWriteTime, + + RemoteSpillReadCount, + RemoteSpillReadBytes, + RemoteSpillReadTime, + + LocalSpillWriteCount, + LocalSpillWriteBytes, + LocalSpillWriteTime, + + LocalSpillReadCount, + LocalSpillReadBytes, + LocalSpillReadTime, + RuntimeFilterPruneParts, MemoryUsage, ExternalServerRetryCount, @@ -189,45 +200,87 @@ pub fn get_statistics_desc() -> Arc unit: StatisticsUnit::Count, plain_statistics: true, }), - (ProfileStatisticsName::SpillWriteCount, ProfileDesc { - display_name: "numbers spilled by write", - desc: "The number of spilled by write", - index: ProfileStatisticsName::SpillWriteCount as usize, + (ProfileStatisticsName::RemoteSpillWriteCount, ProfileDesc { + display_name: "numbers remote spilled by write", + desc: "The number of remote spilled by write", + index: ProfileStatisticsName::RemoteSpillWriteCount as usize, + unit: StatisticsUnit::Count, + plain_statistics: true, + }), + (ProfileStatisticsName::RemoteSpillWriteBytes, ProfileDesc { + display_name: "bytes remote spilled by write", + desc: "The bytes remote spilled by write", + index: ProfileStatisticsName::RemoteSpillWriteBytes as usize, + unit: StatisticsUnit::Bytes, + plain_statistics: true, + }), + (ProfileStatisticsName::RemoteSpillWriteTime, ProfileDesc { + display_name: "remote spilled time by write", + desc: "The time spent to write remote spill in millisecond", + index: ProfileStatisticsName::RemoteSpillWriteTime as usize, + unit: StatisticsUnit::MillisSeconds, + plain_statistics: false, + }), + (ProfileStatisticsName::RemoteSpillReadCount, ProfileDesc { + display_name: "numbers remote spilled by read", + desc: "The number of remote spilled by read", + index: ProfileStatisticsName::RemoteSpillReadCount as usize, + unit: StatisticsUnit::Count, + plain_statistics: true, + }), + (ProfileStatisticsName::RemoteSpillReadBytes, ProfileDesc { + display_name: "bytes remote spilled by read", + desc: "The bytes remote spilled by read", + index: ProfileStatisticsName::RemoteSpillReadBytes as usize, + unit: StatisticsUnit::Bytes, + plain_statistics: true, + }), + (ProfileStatisticsName::RemoteSpillReadTime, ProfileDesc { + display_name: "remote spilled time by read", + desc: "The time spent to read remote spill in millisecond", + index: ProfileStatisticsName::RemoteSpillReadTime as usize, + unit: StatisticsUnit::MillisSeconds, + plain_statistics: false, + }), + (ProfileStatisticsName::LocalSpillWriteCount, ProfileDesc { + display_name: "numbers local spilled by write", + desc: "The number of local spilled by write", + index: ProfileStatisticsName::LocalSpillWriteCount as usize, unit: StatisticsUnit::Count, plain_statistics: true, }), - (ProfileStatisticsName::SpillWriteBytes, ProfileDesc { - display_name: "bytes spilled by write", - desc: "The bytes spilled by write", - index: ProfileStatisticsName::SpillWriteBytes as usize, + (ProfileStatisticsName::LocalSpillWriteBytes, ProfileDesc { + display_name: "bytes local spilled by write", + desc: "The bytes local spilled by write", + index: ProfileStatisticsName::LocalSpillWriteBytes as usize, unit: StatisticsUnit::Bytes, plain_statistics: true, }), - (ProfileStatisticsName::SpillWriteTime, ProfileDesc { - display_name: "spilled time by write", - desc: "The time spent to write spill in millisecond", - index: ProfileStatisticsName::SpillWriteTime as usize, + (ProfileStatisticsName::LocalSpillWriteTime, ProfileDesc { + display_name: "local spilled time by write", + desc: "The time spent to write local spill in millisecond", + index: ProfileStatisticsName::LocalSpillWriteTime as usize, unit: StatisticsUnit::MillisSeconds, plain_statistics: false, }), - (ProfileStatisticsName::SpillReadCount, ProfileDesc { - display_name: "numbers spilled by read", - desc: "The number of spilled by read", - index: ProfileStatisticsName::SpillReadCount as usize, + (ProfileStatisticsName::LocalSpillReadCount, ProfileDesc { + display_name: "numbers local spilled by read", + desc: "The number of local spilled by read", + index: ProfileStatisticsName::LocalSpillReadCount as usize, unit: StatisticsUnit::Count, plain_statistics: true, }), - (ProfileStatisticsName::SpillReadBytes, ProfileDesc { - display_name: "bytes spilled by read", - desc: "The bytes spilled by read", - index: ProfileStatisticsName::SpillReadBytes as usize, + (ProfileStatisticsName::LocalSpillReadBytes, ProfileDesc { + display_name: "bytes local spilled by read", + desc: "The bytes local spilled by read", + index: ProfileStatisticsName::LocalSpillReadBytes as usize, unit: StatisticsUnit::Bytes, plain_statistics: true, }), - (ProfileStatisticsName::SpillReadTime, ProfileDesc { - display_name: "spilled time by read", - desc: "The time spent to read spill in millisecond", - index: ProfileStatisticsName::SpillReadTime as usize, + (ProfileStatisticsName::LocalSpillReadTime, ProfileDesc { + display_name: "local spilled time by read", + desc: "The time spent to read local spill in millisecond", + index: ProfileStatisticsName::LocalSpillReadTime as usize, unit: StatisticsUnit::MillisSeconds, plain_statistics: false, }), diff --git a/src/common/tracing/Cargo.toml b/src/common/tracing/Cargo.toml index 58ebe2903dbf..e0c549930abc 100644 --- a/src/common/tracing/Cargo.toml +++ b/src/common/tracing/Cargo.toml @@ -21,7 +21,7 @@ defer = "0.2" fastrace = { workspace = true } fastrace-opentelemetry = { workspace = true } itertools = { workspace = true } -libc = "0.2.153" +libc = { workspace = true } log = { workspace = true } logforth = { version = "0.12", features = [ 'json', diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index 767bdb255b22..c1a9dfab71bb 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -25,6 +25,7 @@ use clap::Parser; use clap::Subcommand; use clap::ValueEnum; use databend_common_base::base::mask_string; +use databend_common_base::base::OrderedFloat; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::principal::UserSettingValue; @@ -132,6 +133,10 @@ pub struct Config { #[clap(flatten)] pub cache: CacheConfig, + // spill Config + #[clap(flatten)] + pub spill: SpillConfig, + // background configs #[clap(flatten)] pub background: BackgroundConfig, @@ -2930,6 +2935,26 @@ pub struct DiskCacheConfig { pub sync_data: bool, } +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Args, Default)] +#[serde(default, deny_unknown_fields)] +pub struct SpillConfig { + /// Path of spill to local disk. disable if it's empty. + #[clap( + long, + value_name = "VALUE", + default_value = "./.databend/temp/_query_spill" + )] + pub spill_local_disk_path: String, + + #[clap(long, value_name = "VALUE", default_value = "30")] + /// Percentage of reserve disk space that won't be used for spill to local disk. + pub spill_local_disk_reserved_space_percentage: OrderedFloat, + + #[clap(long, value_name = "VALUE", default_value = "18446744073709551615")] + /// Allow space in bytes to spill to local disk. + pub spill_local_disk_max_bytes: u64, +} + mod cache_config_converters { use log::warn; @@ -2953,6 +2978,7 @@ mod cache_config_converters { .map(|(k, v)| (k, v.into())) .collect(), cache: inner.cache.into(), + spill: inner.spill.into(), background: inner.background.into(), } } @@ -2985,6 +3011,7 @@ mod cache_config_converters { storage: self.storage.try_into()?, catalogs, cache: self.cache.try_into()?, + spill: self.spill.try_into()?, background: self.background.try_into()?, }) } @@ -3047,6 +3074,41 @@ mod cache_config_converters { } } + impl TryFrom for inner::SpillConfig { + type Error = ErrorCode; + + fn try_from(value: SpillConfig) -> std::result::Result { + let SpillConfig { + spill_local_disk_path, + spill_local_disk_reserved_space_percentage: spill_local_disk_max_space_percentage, + spill_local_disk_max_bytes, + } = value; + if !spill_local_disk_max_space_percentage.is_normal() + || spill_local_disk_max_space_percentage.is_sign_negative() + || spill_local_disk_max_space_percentage > OrderedFloat(100.0) + { + return Err(ErrorCode::InvalidArgument( + "invalid spill_local_disk_max_space_percentage", + )); + } + Ok(Self { + path: spill_local_disk_path, + reserved_disk_ratio: spill_local_disk_max_space_percentage / 100.0, + global_bytes_limit: spill_local_disk_max_bytes, + }) + } + } + + impl From for SpillConfig { + fn from(value: inner::SpillConfig) -> Self { + Self { + spill_local_disk_path: value.path, + spill_local_disk_reserved_space_percentage: value.reserved_disk_ratio * 100.0, + spill_local_disk_max_bytes: value.global_bytes_limit, + } + } + } + impl TryFrom for inner::DiskCacheConfig { type Error = ErrorCode; fn try_from(value: DiskCacheConfig) -> std::result::Result { diff --git a/src/query/config/src/inner.rs b/src/query/config/src/inner.rs index cf6dc8fb847f..9b508d9daf21 100644 --- a/src/query/config/src/inner.rs +++ b/src/query/config/src/inner.rs @@ -22,6 +22,7 @@ use std::time::Duration; use databend_common_base::base::mask_string; use databend_common_base::base::GlobalUniqName; +use databend_common_base::base::OrderedFloat; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_grpc::RpcClientConf; @@ -64,6 +65,9 @@ pub struct InnerConfig { // Cache Config pub cache: CacheConfig, + // Spill Config + pub spill: SpillConfig, + // Background Config pub background: InnerBackgroundConfig, } @@ -141,6 +145,7 @@ impl Debug for InnerConfig { .field("storage", &self.storage) .field("catalogs", &self.catalogs) .field("cache", &self.cache) + .field("spill", &self.spill) .field("background", &self.background) .finish() } @@ -701,3 +706,25 @@ impl Default for CacheConfig { } } } + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct SpillConfig { + /// Path of spill to local disk. disable if it's empty. + pub path: String, + + /// Ratio of the reserve of the disk space. + pub reserved_disk_ratio: OrderedFloat, + + /// Allow bytes use of disk space. + pub global_bytes_limit: u64, +} + +impl Default for SpillConfig { + fn default() -> Self { + Self { + path: "./.databend/temp/_query_spill".to_string(), + reserved_disk_ratio: OrderedFloat(0.3), + global_bytes_limit: u64::MAX, + } + } +} diff --git a/src/query/config/src/lib.rs b/src/query/config/src/lib.rs index 269241a72bed..c6a9313447c9 100644 --- a/src/query/config/src/lib.rs +++ b/src/query/config/src/lib.rs @@ -49,6 +49,7 @@ pub use inner::CatalogConfig; pub use inner::CatalogHiveConfig; pub use inner::DiskCacheKeyReloadPolicy; pub use inner::InnerConfig; +pub use inner::SpillConfig; pub use inner::ThriftProtocol; pub use version::DATABEND_COMMIT_VERSION; pub use version::QUERY_GIT_SEMVER; diff --git a/src/query/config/src/mask.rs b/src/query/config/src/mask.rs index 37fae7279aac..62a5086b2c52 100644 --- a/src/query/config/src/mask.rs +++ b/src/query/config/src/mask.rs @@ -51,6 +51,7 @@ impl Config { storage: self.storage.mask_display(), catalog: self.catalog, cache: self.cache, + spill: self.spill, background: self.background, catalogs: self.catalogs, } diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index c9f91b8818ef..c40a4ebb8601 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -35,6 +35,7 @@ use databend_common_users::builtin::BuiltIn; use databend_common_users::RoleCacheManager; use databend_common_users::UserApiProvider; use databend_storages_common_cache::CacheManager; +use databend_storages_common_cache::TempDirManager; use crate::auth::AuthMgr; use crate::builtin::BuiltinUDFs; @@ -146,6 +147,7 @@ impl GlobalServices { &config.query.max_server_memory_usage, config.query.tenant_id.tenant_name().to_string(), )?; + TempDirManager::init(&config.spill, config.query.tenant_id.tenant_name())?; if let Some(addr) = config.query.cloud_control_grpc_server_address.clone() { CloudControlApiProvider::init(addr, config.query.cloud_control_grpc_timeout).await?; diff --git a/src/query/service/src/interpreters/hook/vacuum_hook.rs b/src/query/service/src/interpreters/hook/vacuum_hook.rs index 35d0682bba30..21f01ea8cc6f 100644 --- a/src/query/service/src/interpreters/hook/vacuum_hook.rs +++ b/src/query/service/src/interpreters/hook/vacuum_hook.rs @@ -23,7 +23,10 @@ use databend_common_license::license_manager::LicenseManagerSwitch; use databend_common_pipeline_core::query_spill_prefix; use databend_common_storage::DataOperator; use databend_enterprise_vacuum_handler::get_vacuum_handler; +use databend_storages_common_cache::TempDirManager; +use log::warn; use opendal::Buffer; +use rand::Rng; use crate::sessions::QueryContext; @@ -65,3 +68,19 @@ pub fn hook_vacuum_temp_files(query_ctx: &Arc) -> Result<()> { Ok(()) } + +pub fn hook_disk_temp_dir(query_ctx: &Arc) -> Result<()> { + let mgr = TempDirManager::instance(); + + if mgr.drop_disk_spill_dir(&query_ctx.get_id())? && rand::thread_rng().gen_ratio(1, 10) { + let limit = query_ctx + .get_settings() + .get_spilling_to_disk_vacuum_unknown_temp_dirs_limit()?; + let deleted = mgr.drop_disk_spill_dir_unknown(limit)?; + if !deleted.is_empty() { + warn!("Deleted residual temporary directories: {:?}", deleted) + } + } + + Ok(()) +} diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index c74892dc32a0..1f220e17edf4 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -44,10 +44,11 @@ use log::info; use md5::Digest; use md5::Md5; -use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files; -use crate::interpreters::interpreter_txn_commit::CommitInterpreter; -use crate::interpreters::InterpreterMetrics; -use crate::interpreters::InterpreterQueryLog; +use super::hook::vacuum_hook::hook_disk_temp_dir; +use super::hook::vacuum_hook::hook_vacuum_temp_files; +use super::interpreter_txn_commit::CommitInterpreter; +use super::InterpreterMetrics; +use super::InterpreterQueryLog; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::executor::PipelinePullingExecutor; @@ -285,6 +286,8 @@ pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc) hook_vacuum_temp_files(&query_ctx)?; + hook_disk_temp_dir(&query_ctx)?; + let err_opt = match &info.res { Ok(_) => None, Err(e) => Some(e.clone()), diff --git a/src/query/service/src/pipelines/builders/builder_sort.rs b/src/query/service/src/pipelines/builders/builder_sort.rs index afbb3e25b85a..cb039e615312 100644 --- a/src/query/service/src/pipelines/builders/builder_sort.rs +++ b/src/query/service/src/pipelines/builders/builder_sort.rs @@ -282,14 +282,17 @@ impl SortPipelineBuilder { if may_spill { let schema = add_order_field(sort_merge_output_schema.clone(), &self.sort_desc); - let config = SpillerConfig::create(query_spill_prefix( - self.ctx.get_tenant().tenant_name(), - &self.ctx.get_id(), - )); + let config = SpillerConfig { + location_prefix: query_spill_prefix( + self.ctx.get_tenant().tenant_name(), + &self.ctx.get_id(), + ), + disk_spill: None, + spiller_type: SpillerType::OrderBy, + }; pipeline.add_transform(|input, output| { let op = DataOperator::instance().operator(); - let spiller = - Spiller::create(self.ctx.clone(), op, config.clone(), SpillerType::OrderBy)?; + let spiller = Spiller::create(self.ctx.clone(), op, config.clone())?; Ok(ProcessorPtr::create(create_transform_sort_spill( input, output, diff --git a/src/query/service/src/pipelines/builders/builder_window.rs b/src/query/service/src/pipelines/builders/builder_window.rs index 8fa28c443dbc..0bddf1fb2aed 100644 --- a/src/query/service/src/pipelines/builders/builder_window.rs +++ b/src/query/service/src/pipelines/builders/builder_window.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic; use std::sync::atomic::AtomicUsize; use databend_common_catalog::table_context::TableContext; @@ -24,6 +25,7 @@ use databend_common_pipeline_core::processors::Processor; use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_sql::executor::physical_plans::Window; use databend_common_sql::executor::physical_plans::WindowPartition; +use databend_storages_common_cache::TempDirManager; use crate::pipelines::processors::transforms::FrameBound; use crate::pipelines::processors::transforms::TransformWindowPartitionCollect; @@ -141,11 +143,6 @@ impl PipelineBuilder { // Settings. let settings = self.ctx.get_settings(); let num_partitions = settings.get_window_num_partitions()?; - let max_block_size = settings.get_max_block_size()? as usize; - let sort_block_size = settings.get_window_partition_sort_block_size()? as usize; - let sort_spilling_batch_bytes = settings.get_sort_spilling_batch_bytes()?; - let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?; - let window_spill_settings = WindowSpillSettings::new(settings.clone(), num_processors)?; let plan_schema = window_partition.output_schema()?; @@ -169,13 +166,18 @@ impl PipelineBuilder { }) .collect::>>()?; - let have_order_col = window_partition.after_exchange.unwrap_or(false); - self.main_pipeline.exchange( num_processors, WindowPartitionExchange::create(partition_by.clone(), num_partitions), ); + let disk_bytes_limit = settings.get_window_partition_spilling_to_disk_bytes_limit()?; + let temp_dir_manager = TempDirManager::instance(); + let disk_spill = temp_dir_manager.get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id()); + + let window_spill_settings = WindowSpillSettings::new(&settings, num_processors)?; + let have_order_col = window_partition.after_exchange.unwrap_or(false); + let processor_id = AtomicUsize::new(0); self.main_pipeline.add_transform(|input, output| { Ok(ProcessorPtr::create(Box::new( @@ -183,16 +185,14 @@ impl PipelineBuilder { self.ctx.clone(), input, output, - processor_id.fetch_add(1, std::sync::atomic::Ordering::AcqRel), + &settings, + processor_id.fetch_add(1, atomic::Ordering::AcqRel), num_processors, num_partitions, window_spill_settings.clone(), + disk_spill.clone(), sort_desc.clone(), plan_schema.clone(), - max_block_size, - sort_block_size, - sort_spilling_batch_bytes, - enable_loser_tree, have_order_col, )?, ))) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs index 787a199fe537..3b3d56586b54 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs @@ -269,10 +269,13 @@ pub fn agg_spilling_aggregate_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } @@ -366,10 +369,13 @@ pub fn spilling_aggregate_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 8d621aba7ea6..7e38f9ec41e0 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -311,10 +311,13 @@ fn agg_spilling_aggregate_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } @@ -432,10 +435,13 @@ fn spilling_aggregate_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs index bdd2b95e29cd..d68a956d1ec9 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs @@ -365,10 +365,13 @@ fn agg_spilling_group_by_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } @@ -484,10 +487,13 @@ fn spilling_group_by_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_spill_writer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_spill_writer.rs index 5a3a35219780..04d0c36b7f3d 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_spill_writer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_spill_writer.rs @@ -267,10 +267,13 @@ pub fn agg_spilling_group_by_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } @@ -360,10 +363,13 @@ pub fn spilling_group_by_payload( // perf { - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteBytes, + write_bytes, + ); Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, + ProfileStatisticsName::RemoteSpillWriteTime, instant.elapsed().as_millis() as usize, ); } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs index 3bf4f8bd7d3a..f625de75db9b 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs @@ -227,15 +227,15 @@ impl Processor // perf { Profile::record_usize_profile( - ProfileStatisticsName::SpillReadCount, + ProfileStatisticsName::RemoteSpillReadCount, 1, ); Profile::record_usize_profile( - ProfileStatisticsName::SpillReadBytes, + ProfileStatisticsName::RemoteSpillReadBytes, data.len(), ); Profile::record_usize_profile( - ProfileStatisticsName::SpillReadTime, + ProfileStatisticsName::RemoteSpillReadTime, instant.elapsed().as_millis() as usize, ); } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs index c9c0a9977341..876542882644 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_spiller.rs @@ -62,17 +62,18 @@ impl HashJoinSpiller { is_build_side: bool, ) -> Result { // Create a Spiller for spilling build side data. - let spill_config = SpillerConfig::create(query_spill_prefix( - ctx.get_tenant().tenant_name(), - &ctx.get_id(), - )); - let operator = DataOperator::instance().operator(); let spiller_type = if is_build_side { SpillerType::HashJoinBuild } else { SpillerType::HashJoinProbe }; - let spiller = Spiller::create(ctx.clone(), operator, spill_config, spiller_type)?; + let spill_config = SpillerConfig { + location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()), + disk_spill: None, + spiller_type, + }; + let operator = DataOperator::instance().operator(); + let spiller = Spiller::create(ctx.clone(), operator, spill_config)?; let num_partitions = (1 << spill_partition_bits) as usize; // The memory threshold of each partition, we will spill the partition data diff --git a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs index 67e9e0d21c4c..f0c6bd97d556 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs @@ -43,6 +43,7 @@ use databend_common_pipeline_transforms::processors::sort::SortSpillMeta; use databend_common_pipeline_transforms::processors::sort::SortSpillMetaWithParams; use databend_common_pipeline_transforms::processors::sort::SortedStream; +use crate::spillers::Location; use crate::spillers::Spiller; enum State { @@ -77,7 +78,7 @@ pub struct TransformSortSpill { /// Blocks to merge one time. num_merge: usize, /// Unmerged list of blocks. Each list are sorted. - unmerged_blocks: VecDeque>, + unmerged_blocks: VecDeque>, /// If `ummerged_blocks.len()` < `num_merge`, /// we can use a final merger to merge the last few sorted streams to reduce IO. @@ -359,7 +360,7 @@ where R: Rows + Sync + Send + 'static } enum BlockStream { - Spilled((VecDeque, Arc)), + Spilled((VecDeque, Arc)), Block(Option), } @@ -485,12 +486,13 @@ mod tests { limit: Option, ) -> Result>> { let op = DataOperator::instance().operator(); - let spiller = Spiller::create( - ctx.clone(), - op, - SpillerConfig::create("_spill_test".to_string()), - SpillerType::OrderBy, - )?; + let spill_config = SpillerConfig { + location_prefix: "_spill_test".to_string(), + disk_spill: None, + spiller_type: SpillerType::OrderBy, + }; + + let spiller = Spiller::create(ctx.clone(), op, spill_config)?; let sort_desc = Arc::new(vec![SortColumnDescription { offset: 0, diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs index e81138ce4ea5..f983b6208e65 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs @@ -29,12 +29,20 @@ use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::Processor; +use databend_common_pipeline_core::query_spill_prefix; use databend_common_pipeline_transforms::processors::sort_merge; +use databend_common_settings::Settings; +use databend_common_storage::DataOperator; +use databend_common_storages_fuse::TableContext; +use databend_storages_common_cache::TempDir; use super::WindowPartitionBuffer; use super::WindowPartitionMeta; use super::WindowSpillSettings; use crate::sessions::QueryContext; +use crate::spillers::Spiller; +use crate::spillers::SpillerConfig; +use crate::spillers::SpillerType; #[derive(Debug, Clone, Copy)] pub enum Step { @@ -81,21 +89,19 @@ pub struct TransformWindowPartitionCollect { } impl TransformWindowPartitionCollect { - #[allow(clippy::too_many_arguments)] + #[expect(clippy::too_many_arguments)] pub fn new( ctx: Arc, input: Arc, output: Arc, + settings: &Settings, processor_id: usize, num_processors: usize, num_partitions: usize, spill_settings: WindowSpillSettings, + disk_spill: Option>, sort_desc: Vec, schema: DataSchemaRef, - max_block_size: usize, - sort_block_size: usize, - sort_spilling_batch_bytes: usize, - enable_loser_tree: bool, have_order_col: bool, ) -> Result { // Calculate the partition ids collected by the processor. @@ -109,9 +115,24 @@ impl TransformWindowPartitionCollect { partition_id[*partition] = new_partition_id; } + let spill_config = SpillerConfig { + location_prefix: query_spill_prefix(ctx.get_tenant().tenant_name(), &ctx.get_id()), + disk_spill, + spiller_type: SpillerType::Window, + }; + + // Create an inner `Spiller` to spill data. + let operator = DataOperator::instance().operator(); + let spiller = Spiller::create(ctx, operator, spill_config)?; + // Create the window partition buffer. + let sort_block_size = settings.get_window_partition_sort_block_size()? as usize; let buffer = - WindowPartitionBuffer::new(ctx, partitions.len(), sort_block_size, spill_settings)?; + WindowPartitionBuffer::new(spiller, partitions.len(), sort_block_size, spill_settings)?; + + let max_block_size = settings.get_max_block_size()? as usize; + let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?; + let sort_spilling_batch_bytes = settings.get_sort_spilling_batch_bytes()?; Ok(Self { input, diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs index dd93b0016c3c..28c6b9b2068d 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer.rs @@ -12,23 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use databend_common_base::runtime::GLOBAL_MEM_STAT; use databend_common_exception::Result; use databend_common_expression::DataBlock; -use databend_common_pipeline_core::query_spill_prefix; use databend_common_settings::Settings; -use databend_common_storage::DataOperator; -use databend_common_storages_fuse::TableContext; -use crate::sessions::QueryContext; use crate::spillers::PartitionBuffer; use crate::spillers::PartitionBufferFetchOption; use crate::spillers::SpilledData; use crate::spillers::Spiller; -use crate::spillers::SpillerConfig; -use crate::spillers::SpillerType; /// The `WindowPartitionBuffer` is used to control memory usage of Window operator. pub struct WindowPartitionBuffer { @@ -46,19 +38,11 @@ pub struct WindowPartitionBuffer { impl WindowPartitionBuffer { pub fn new( - ctx: Arc, + spiller: Spiller, num_partitions: usize, sort_block_size: usize, spill_settings: WindowSpillSettings, ) -> Result { - // Create an inner `Spiller` to spill data. - let spill_config = SpillerConfig::create(query_spill_prefix( - ctx.get_tenant().tenant_name(), - &ctx.get_id(), - )); - let operator = DataOperator::instance().operator(); - let spiller = Spiller::create(ctx.clone(), operator, spill_config, SpillerType::Window)?; - // Create a `PartitionBuffer` to store partitioned data. let partition_buffer = PartitionBuffer::create(num_partitions); let restored_partition_buffer = PartitionBuffer::create(num_partitions); @@ -296,7 +280,7 @@ pub struct WindowSpillSettings { } impl WindowSpillSettings { - pub fn new(settings: Arc, num_threads: usize) -> Result { + pub fn new(settings: &Settings, num_threads: usize) -> Result { let global_memory_ratio = std::cmp::min(settings.get_window_partition_spilling_memory_ratio()?, 100) as f64 / 100_f64; diff --git a/src/query/service/src/spillers/mod.rs b/src/query/service/src/spillers/mod.rs index b771bce717f7..31d420849041 100644 --- a/src/query/service/src/spillers/mod.rs +++ b/src/query/service/src/spillers/mod.rs @@ -17,6 +17,7 @@ mod spiller; pub use partition_buffer::PartitionBuffer; pub use partition_buffer::PartitionBufferFetchOption; +pub use spiller::Location; pub use spiller::SpilledData; pub use spiller::Spiller; pub use spiller::SpillerConfig; diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 54c4bd58129d..d13229b65df6 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -16,10 +16,14 @@ use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Display; use std::fmt::Formatter; +use std::io; use std::ops::Range; use std::sync::Arc; use std::time::Instant; +use databend_common_base::base::dma_buffer_as_vec; +use databend_common_base::base::dma_read_file_range; +use databend_common_base::base::dma_write_file_vectored; use databend_common_base::base::GlobalUniqName; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; @@ -29,6 +33,8 @@ use databend_common_exception::Result; use databend_common_expression::arrow::deserialize_column; use databend_common_expression::arrow::serialize_column; use databend_common_expression::DataBlock; +use databend_storages_common_cache::TempDir; +use databend_storages_common_cache::TempPath; use opendal::Operator; use crate::sessions::QueryContext; @@ -59,12 +65,8 @@ impl Display for SpillerType { #[derive(Clone)] pub struct SpillerConfig { pub location_prefix: String, -} - -impl SpillerConfig { - pub fn create(location_prefix: String) -> Self { - Self { location_prefix } - } + pub disk_spill: Option>, + pub spiller_type: SpillerType, } /// Spiller is a unified framework for operators which need to spill data from memory. @@ -77,13 +79,14 @@ impl SpillerConfig { pub struct Spiller { ctx: Arc, operator: Operator, - config: SpillerConfig, + location_prefix: String, + disk_spill: Option>, _spiller_type: SpillerType, pub join_spilling_partition_bits: usize, /// 1 partition -> N partition files - pub partition_location: HashMap>, + pub partition_location: HashMap>, /// Record columns layout for spilled data, will be used when read data from disk - pub columns_layout: HashMap>, + pub columns_layout: HashMap>, /// Record how many bytes have been spilled for each partition. pub partition_spilled_bytes: HashMap, } @@ -94,13 +97,18 @@ impl Spiller { ctx: Arc, operator: Operator, config: SpillerConfig, - spiller_type: SpillerType, ) -> Result { let join_spilling_partition_bits = ctx.get_settings().get_join_spilling_partition_bits()?; + let SpillerConfig { + location_prefix, + disk_spill, + spiller_type, + } = config; Ok(Self { - ctx: ctx.clone(), + ctx, operator, - config, + location_prefix, + disk_spill, _spiller_type: spiller_type, join_spilling_partition_bits, partition_location: Default::default(), @@ -114,31 +122,20 @@ impl Spiller { } /// Spill a [`DataBlock`] to storage. - pub async fn spill(&mut self, data_block: DataBlock) -> Result { - // Serialize data block. - let (data_size, columns_data, columns_layout) = self.serialize_data_block(data_block)?; + pub async fn spill(&mut self, data_block: DataBlock) -> Result { + let instant = Instant::now(); // Spill data to storage. - let instant = Instant::now(); - let unique_name = GlobalUniqName::unique(); - let location = format!("{}/{}", self.config.location_prefix, unique_name); - let mut writer = self - .operator - .writer_with(&location) - .chunk(8 * 1024 * 1024) - .await?; - for data in columns_data.into_iter() { - writer.write(data).await?; - } - writer.close().await?; + let encoded = EncodedBlock::from_block(data_block); + let columns_layout = encoded.columns_layout(); + let data_size = encoded.size(); + let location = self.write_encodes(data_size, vec![encoded]).await?; // Record statistics. - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, data_size as usize); - Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, - instant.elapsed().as_millis() as usize, - ); + match location { + Location::Remote(_) => record_remote_write_profile(&instant, data_size), + Location::Local(_) => record_local_write_profile(&instant, data_size), + } // Record columns layout for spilled data. self.columns_layout.insert(location.clone(), columns_layout); @@ -187,37 +184,25 @@ impl Spiller { let mut spilled_partitions = Vec::with_capacity(partitioned_data.len()); for (partition_id, data_block) in partitioned_data.into_iter() { let begin = write_bytes; - let (data_size, columns_data, columns_layout) = - self.serialize_data_block(data_block)?; + + let encoded = EncodedBlock::from_block(data_block); + let columns_layout = encoded.columns_layout(); + let data_size = encoded.size(); write_bytes += data_size; - write_data.push(columns_data); + write_data.push(encoded); spilled_partitions.push((partition_id, begin..write_bytes, columns_layout)); } // Spill data to storage. let instant = Instant::now(); - let unique_name = GlobalUniqName::unique(); - let location = format!("{}/{}", self.config.location_prefix, unique_name); - let mut writer = self - .operator - .writer_with(&location) - .chunk(8 * 1024 * 1024) - .await?; - for write_bucket_data in write_data.into_iter() { - for data in write_bucket_data.into_iter() { - writer.write(data).await?; - } - } - writer.close().await?; + let location = self.write_encodes(write_bytes, write_data).await?; // Record statistics. - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillWriteBytes, write_bytes as usize); - Profile::record_usize_profile( - ProfileStatisticsName::SpillWriteTime, - instant.elapsed().as_millis() as usize, - ); + match location { + Location::Remote(_) => record_remote_write_profile(&instant, write_bytes), + Location::Local(_) => record_local_write_profile(&instant, write_bytes), + } Ok(SpilledData::MergedPartition { location, @@ -227,43 +212,38 @@ impl Spiller { /// Read a certain file to a [`DataBlock`]. /// We should guarantee that the file is managed by this spiller. - pub async fn read_spilled_file(&self, file: &str) -> Result { - debug_assert!(self.columns_layout.contains_key(file)); + pub async fn read_spilled_file(&self, location: &Location) -> Result { + let columns_layout = self.columns_layout.get(location).unwrap(); // Read spilled data from storage. let instant = Instant::now(); - let data = self.operator.read(file).await?.to_bytes(); - - // Record statistics. - Profile::record_usize_profile(ProfileStatisticsName::SpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillReadBytes, data.len()); - Profile::record_usize_profile( - ProfileStatisticsName::SpillReadTime, - instant.elapsed().as_millis() as usize, - ); - - // Deserialize data block. - let mut begin = 0; - let mut columns = Vec::with_capacity(self.columns_layout.len()); - let columns_layout = self.columns_layout.get(file).unwrap(); - for column_layout in columns_layout.iter() { - columns.push( - deserialize_column(&data[begin as usize..(begin + column_layout) as usize]) - .unwrap(), - ); - begin += column_layout; - } + let block = match location { + Location::Remote(loc) => { + let data = self.operator.read(loc).await?.to_bytes(); + record_remote_read_profile(&instant, data.len()); + deserialize_block(columns_layout, &data) + } + Location::Local(path) => { + let file_size = path.size(); + debug_assert_eq!(file_size, columns_layout.iter().sum::()); + let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; + let data = &buf[range]; + record_local_read_profile(&instant, data.len()); + deserialize_block(columns_layout, data) + } + }; - Ok(DataBlock::new_from_columns(columns)) + Ok(block) } #[async_backtrace::framed] /// Read spilled data with partition id pub async fn read_spilled_partition(&mut self, p_id: &usize) -> Result> { - if let Some(files) = self.partition_location.get(p_id) { - let mut spilled_data = Vec::with_capacity(files.len()); - for file in files.iter() { - let block = self.read_spilled_file(file).await?; + if let Some(locs) = self.partition_location.get(p_id) { + let mut spilled_data = Vec::with_capacity(locs.len()); + for loc in locs.iter() { + let block = self.read_spilled_file(loc).await?; + if block.num_rows() != 0 { spilled_data.push(block); } @@ -285,30 +265,43 @@ impl Spiller { { // Read spilled data from storage. let instant = Instant::now(); - let data = self.operator.read(location).await?.to_bytes(); + + let data = match location { + Location::Remote(loc) => self.operator.read(loc).await?.to_bytes(), + Location::Local(path) => { + let file_size = path.size(); + debug_assert_eq!( + file_size, + if let Some((_, range, _)) = partitions.last() { + range.end + } else { + 0 + } + ); + + let (mut buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; + assert_eq!(range.start, 0); + buf.truncate(range.end); + + dma_buffer_as_vec(buf).into() + } + }; // Record statistics. - Profile::record_usize_profile(ProfileStatisticsName::SpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillReadBytes, data.len()); - Profile::record_usize_profile( - ProfileStatisticsName::SpillReadTime, - instant.elapsed().as_millis() as usize, - ); + match location { + Location::Remote(_) => record_remote_read_profile(&instant, data.len()), + Location::Local(_) => record_local_read_profile(&instant, data.len()), + }; // Deserialize partitioned data block. - let mut partitioned_data = Vec::with_capacity(partitions.len()); - for (partition_id, range, columns_layout) in partitions.iter() { - let mut begin = range.start; - let mut columns = Vec::with_capacity(columns_layout.len()); - for column_layout in columns_layout.iter() { - columns.push( - deserialize_column(&data[begin as usize..(begin + column_layout) as usize]) - .unwrap(), - ); - begin += column_layout; - } - partitioned_data.push((*partition_id, DataBlock::new_from_columns(columns))); - } + let partitioned_data = partitions + .iter() + .map(|(partition_id, range, columns_layout)| { + let block = deserialize_block(columns_layout, &data[range.clone()]); + (*partition_id, block) + }) + .collect(); + return Ok(partitioned_data); } Ok(vec![]) @@ -316,70 +309,166 @@ impl Spiller { pub async fn read_range( &self, - location: &str, - data_range: Range, - columns_layout: &[u64], + location: &Location, + data_range: Range, + columns_layout: &[usize], ) -> Result { // Read spilled data from storage. let instant = Instant::now(); - let data = self - .operator - .read_with(location) - .range(data_range) - .await? - .to_vec(); + let data_range = data_range.start as u64..data_range.end as u64; + + match location { + Location::Remote(loc) => { + let data = self + .operator + .read_with(loc) + .range(data_range) + .await? + .to_bytes(); + record_remote_read_profile(&instant, data.len()); + Ok(deserialize_block(columns_layout, &data)) + } + Location::Local(path) => { + let (buf, range) = dma_read_file_range(path, data_range).await?; + let data = &buf[range]; + record_local_read_profile(&instant, data.len()); + Ok(deserialize_block(columns_layout, data)) + } + } + } - // Record statistics. - Profile::record_usize_profile(ProfileStatisticsName::SpillReadCount, 1); - Profile::record_usize_profile(ProfileStatisticsName::SpillReadBytes, data.len()); - Profile::record_usize_profile( - ProfileStatisticsName::SpillReadTime, - instant.elapsed().as_millis() as usize, - ); - - // Deserialize data block. - let mut begin = 0; - let mut columns = Vec::with_capacity(columns_layout.len()); - for column_layout in columns_layout.iter() { - columns.push( - deserialize_column(&data[begin as usize..(begin + column_layout) as usize]) - .unwrap(), - ); - begin += column_layout; + async fn write_encodes(&mut self, size: usize, blocks: Vec) -> Result { + let location = match &self.disk_spill { + None => None, + Some(disk) => disk.new_file_with_size(size)?.map(Location::Local), } + .unwrap_or(Location::Remote(format!( + "{}/{}", + self.location_prefix, + GlobalUniqName::unique(), + ))); + + let written = match &location { + Location::Remote(loc) => { + let mut writer = self + .operator + .writer_with(loc) + .chunk(8 * 1024 * 1024) + .await?; + + let mut written = 0; + for data in blocks.into_iter().flat_map(|x| x.0) { + written += data.len(); + writer.write(data).await?; + } - Ok(DataBlock::new_from_columns(columns)) + writer.close().await?; + written + } + Location::Local(path) => { + let bufs = blocks + .iter() + .flat_map(|x| &x.0) + .map(|data| io::IoSlice::new(data)) + .collect::>(); + + dma_write_file_vectored(path.as_ref(), &bufs).await? + } + }; + debug_assert_eq!(size, written); + Ok(location) } - pub(crate) fn spilled_files(&self) -> Vec { + pub(crate) fn spilled_files(&self) -> Vec { self.columns_layout.keys().cloned().collect() } - - // Serialize data block to (data_size, columns_data, columns_layout). - fn serialize_data_block(&self, data_block: DataBlock) -> Result<(u64, Vec>, Vec)> { - let num_columns = data_block.num_columns(); - let mut data_size = 0; - let mut columns_data = Vec::with_capacity(num_columns); - let mut columns_layout = Vec::with_capacity(num_columns); - - for column in data_block.columns() { - let column = column - .value - .convert_to_full_column(&column.data_type, data_block.num_rows()); - let column_data = serialize_column(&column); - - data_size += column_data.len() as u64; - columns_layout.push(column_data.len() as u64); - columns_data.push(column_data); - } - Ok((data_size, columns_data, columns_layout)) - } } pub enum SpilledData { - Partition(String), + Partition(Location), MergedPartition { - location: String, - partitions: Vec<(usize, Range, Vec)>, + location: Location, + partitions: Vec<(usize, Range, Vec)>, }, } + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub enum Location { + Remote(String), + Local(TempPath), +} + +pub struct EncodedBlock(pub Vec>); + +impl EncodedBlock { + pub fn from_block(block: DataBlock) -> Self { + let data = block + .columns() + .iter() + .map(|entry| { + let column = entry + .value + .convert_to_full_column(&entry.data_type, block.num_rows()); + serialize_column(&column) + }) + .collect(); + EncodedBlock(data) + } + + pub fn columns_layout(&self) -> Vec { + self.0.iter().map(|data| data.len()).collect() + } + + pub fn size(&self) -> usize { + self.0.iter().map(|data| data.len()).sum() + } +} + +pub fn deserialize_block(columns_layout: &[usize], mut data: &[u8]) -> DataBlock { + let columns = columns_layout + .iter() + .map(|layout| { + let (cur, remain) = data.split_at(*layout); + data = remain; + deserialize_column(cur).unwrap() + }) + .collect::>(); + + DataBlock::new_from_columns(columns) +} + +pub fn record_remote_write_profile(start: &Instant, write_bytes: usize) { + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteBytes, write_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillWriteTime, + start.elapsed().as_millis() as usize, + ); +} + +pub fn record_remote_read_profile(start: &Instant, read_bytes: usize) { + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::RemoteSpillReadTime, + start.elapsed().as_millis() as usize, + ); +} + +pub fn record_local_write_profile(start: &Instant, write_bytes: usize) { + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillWriteBytes, write_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::LocalSpillWriteTime, + start.elapsed().as_millis() as usize, + ); +} + +pub fn record_local_read_profile(start: &Instant, read_bytes: usize) { + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadCount, 1); + Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadBytes, read_bytes); + Profile::record_usize_profile( + ProfileStatisticsName::LocalSpillReadTime, + start.elapsed().as_millis() as usize, + ); +} diff --git a/src/query/service/src/test_kits/fixture.rs b/src/query/service/src/test_kits/fixture.rs index 115606c9b7d4..09fb1534168a 100644 --- a/src/query/service/src/test_kits/fixture.rs +++ b/src/query/service/src/test_kits/fixture.rs @@ -17,7 +17,6 @@ use std::str; use std::sync::Arc; use databend_common_ast::ast::Engine; -use databend_common_base::runtime::drop_guard; use databend_common_catalog::catalog_kind::CATALOG_DEFAULT; use databend_common_catalog::cluster_info::Cluster; use databend_common_config::InnerConfig; @@ -114,7 +113,7 @@ impl Drop for TestGuard { fn drop(&mut self) { #[cfg(debug_assertions)] { - drop_guard(move || { + databend_common_base::runtime::drop_guard(move || { databend_common_base::base::GlobalInstance::drop_testing(&self._thread_name); }) } diff --git a/src/query/service/tests/it/spillers/spiller.rs b/src/query/service/tests/it/spillers/spiller.rs index 387ccd5a4062..ad9779a7d615 100644 --- a/src/query/service/tests/it/spillers/spiller.rs +++ b/src/query/service/tests/it/spillers/spiller.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; + use databend_common_base::base::tokio; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; @@ -24,6 +26,7 @@ use databend_common_expression::FromData; use databend_common_expression::ScalarRef; use databend_common_pipeline_core::query_spill_prefix; use databend_common_storage::DataOperator; +use databend_query::spillers::Location; use databend_query::spillers::Spiller; use databend_query::spillers::SpillerConfig; use databend_query::spillers::SpillerType; @@ -35,11 +38,14 @@ async fn test_spill_with_partition() -> Result<()> { let ctx = fixture.new_query_ctx().await?; let tenant = ctx.get_tenant(); - let spiller_config = - SpillerConfig::create(query_spill_prefix(tenant.tenant_name(), &ctx.get_id())); + let spiller_config = SpillerConfig { + location_prefix: query_spill_prefix(tenant.tenant_name(), &ctx.get_id()), + disk_spill: None, + spiller_type: SpillerType::HashJoinBuild, + }; let operator = DataOperator::instance().operator(); - let mut spiller = Spiller::create(ctx, operator, spiller_config, SpillerType::HashJoinBuild)?; + let mut spiller = Spiller::create(ctx, operator, spiller_config)?; // Generate data block: two columns, type is i32, 100 rows let data = DataBlock::new_from_columns(vec![ @@ -50,7 +56,8 @@ async fn test_spill_with_partition() -> Result<()> { let res = spiller.spill_with_partition(0, data).await; assert!(res.is_ok()); - assert!(spiller.partition_location.get(&0).unwrap()[0].starts_with("_query_spill")); + let location = &spiller.partition_location.get(&0).unwrap()[0]; + assert_matches!(location, Location::Remote(_)); // Test read spilled data let block = DataBlock::concat(&spiller.read_spilled_partition(&(0)).await?)?; diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index a1a6ddb005f1..50f9aeb27e5b 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -303,6 +303,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), + ("spilling_to_disk_vacuum_unknown_temp_dirs_limit", DefaultSettingValue { + value: UserSettingValue::UInt64(u64::MAX), + desc: "Set the maximum number of directories to clean up. If there are some temporary dirs when another query is unexpectedly interrupted, which needs to be cleaned up after this query.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ("enable_merge_into_row_fetch", DefaultSettingValue { value: UserSettingValue::UInt64(1), desc: "Enable merge into row fetch optimization.", @@ -460,6 +466,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=100)), }), + ("window_partition_spilling_to_disk_bytes_limit", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Sets the maximum amount of local disk in bytes that each window partitioner can use before spilling data to storage during query execution.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ("window_num_partitions", DefaultSettingValue { value: UserSettingValue::UInt64(256), desc: "Sets the number of partitions for window operator.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index e54806e1550f..55c2e46cd7e5 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -286,6 +286,10 @@ impl Settings { Ok(self.try_get_u64("join_spilling_buffer_threshold_per_proc_mb")? as usize) } + pub fn get_spilling_to_disk_vacuum_unknown_temp_dirs_limit(&self) -> Result { + Ok(self.try_get_u64("spilling_to_disk_vacuum_unknown_temp_dirs_limit")? as usize) + } + pub fn get_inlist_to_join_threshold(&self) -> Result { Ok(self.try_get_u64("inlist_to_join_threshold")? as usize) } @@ -395,6 +399,10 @@ impl Settings { Ok(self.try_get_u64("window_partition_spilling_bytes_threshold_per_proc")? as usize) } + pub fn get_window_partition_spilling_to_disk_bytes_limit(&self) -> Result { + Ok(self.try_get_u64("window_partition_spilling_to_disk_bytes_limit")? as usize) + } + pub fn get_window_partition_spilling_memory_ratio(&self) -> Result { Ok(self.try_get_u64("window_partition_spilling_memory_ratio")? as usize) } diff --git a/src/query/storages/common/cache/Cargo.toml b/src/query/storages/common/cache/Cargo.toml index d8c5b0bc4b08..7cb83f2577ee 100644 --- a/src/query/storages/common/cache/Cargo.toml +++ b/src/query/storages/common/cache/Cargo.toml @@ -31,6 +31,7 @@ hex = "0.4.3" log = { workspace = true } parking_lot = { workspace = true } rayon = "1.9.0" +rustix = "0.38.37" siphasher = "0.3.10" [dev-dependencies] diff --git a/src/query/storages/common/cache/src/lib.rs b/src/query/storages/common/cache/src/lib.rs index 0a7378134b71..8c70b627aeac 100644 --- a/src/query/storages/common/cache/src/lib.rs +++ b/src/query/storages/common/cache/src/lib.rs @@ -14,12 +14,14 @@ #![feature(write_all_vectored)] #![feature(associated_type_defaults)] +#![feature(assert_matches)] mod cache; mod caches; mod manager; mod providers; mod read; +mod temp_dir; pub use cache::CacheAccessor; pub use cache::Unit; @@ -45,3 +47,4 @@ pub use read::InMemoryCacheReader; pub use read::InMemoryItemCacheReader; pub use read::LoadParams; pub use read::Loader; +pub use temp_dir::*; diff --git a/src/query/storages/common/cache/src/temp_dir.rs b/src/query/storages/common/cache/src/temp_dir.rs new file mode 100644 index 000000000000..ad23c6a06657 --- /dev/null +++ b/src/query/storages/common/cache/src/temp_dir.rs @@ -0,0 +1,409 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::fmt::Debug; +use std::fs::create_dir; +use std::fs::create_dir_all; +use std::fs::remove_dir_all; +use std::hash::Hash; +use std::io::ErrorKind; +use std::ops::Deref; +use std::ops::Drop; +use std::path::Path; +use std::path::PathBuf; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::Once; + +use databend_common_base::base::GlobalInstance; +use databend_common_base::base::GlobalUniqName; +use databend_common_config::SpillConfig; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use rustix::fs::statvfs; + +pub struct TempDirManager { + root: Option>, + + // Global limit in bytes + global_limit: usize, + // Reserved disk space in blocks + reserved: u64, + + group: Mutex, +} + +impl TempDirManager { + pub fn init(config: &SpillConfig, tenant_id: &str) -> Result<()> { + let (root, reserved) = if config.path.is_empty() { + (None, 0) + } else { + let path = PathBuf::from(&config.path) + .join(tenant_id) + .into_boxed_path(); + + if let Err(e) = remove_dir_all(&path) { + if !matches!(e.kind(), ErrorKind::NotFound) { + return Err(ErrorCode::StorageUnavailable(format!( + "can't clean temp dir: {e}", + ))); + } + } + + if create_dir_all(&path).is_err() { + (None, 0) + } else { + let stat = + statvfs(path.as_ref()).map_err(|e| ErrorCode::StorageOther(e.to_string()))?; + let reserved = (stat.f_blocks as f64 * *config.reserved_disk_ratio) as u64; + + (Some(path), reserved) + } + }; + + GlobalInstance::set(Arc::new(Self { + root, + global_limit: config.global_bytes_limit as usize, + reserved, + group: Mutex::new(Group { + dirs: HashMap::new(), + }), + })); + Ok(()) + } + + pub fn instance() -> Arc { + GlobalInstance::get() + } + + pub fn get_disk_spill_dir( + self: &Arc, + limit: usize, + query_id: &str, + ) -> Option> { + if limit == 0 { + return None; + } + + let path = self.root.as_ref()?.join(query_id).into_boxed_path(); + let mut group = self.group.lock().unwrap(); + let dir = match group.dirs.entry(path.clone()) { + Entry::Occupied(o) => TempDir { + path, + dir_info: o.get().clone(), + manager: self.clone(), + }, + Entry::Vacant(v) => { + let dir_info = Arc::new(DirInfo { + limit, + count: Default::default(), + size: Default::default(), + inited: Once::new(), + }); + v.insert(dir_info.clone()); + TempDir { + path, + dir_info, + manager: self.clone(), + } + } + }; + Some(Arc::new(dir)) + } + + pub fn drop_disk_spill_dir(self: &Arc, query_id: &str) -> Result { + let path = match self.root.as_ref() { + None => return Ok(false), + Some(root) => root.join(query_id).into_boxed_path(), + }; + + let mut group = self.group.lock().unwrap(); + if group.dirs.remove(&path).is_some() { + match remove_dir_all(&path) { + Ok(_) => return Ok(true), + Err(e) if matches!(e.kind(), ErrorKind::NotFound) => {} + res => res?, + } + } + Ok(false) + } + + pub fn drop_disk_spill_dir_unknown( + self: &Arc, + limit: usize, + ) -> Result>> { + match self.root.as_ref() { + None => Ok(vec![]), + Some(root) => { + let read_dir = std::fs::read_dir(root)?; + let group = self.group.lock().unwrap(); + let to_delete = read_dir + .filter_map(|entry| match entry { + Err(_) => None, + Ok(entry) => { + let path = entry.path().into_boxed_path(); + if group.dirs.contains_key(&path) { + None + } else { + Some(path) + } + } + }) + .take(limit) + .collect::>(); + drop(group); + for path in &to_delete { + remove_dir_all(path)?; + } + Ok(to_delete) + } + } + } + + fn insufficient_disk(&self, size: u64) -> Result { + let stat = statvfs(self.root.as_ref().unwrap().as_ref()) + .map_err(|e| ErrorCode::Internal(e.to_string()))?; + Ok(stat.f_bavail < self.reserved + (size + stat.f_frsize - 1) / stat.f_frsize) + } +} + +struct Group { + dirs: HashMap, Arc>, +} + +impl Group { + fn size(&self) -> usize { + self.dirs.values().map(|v| *v.size.lock().unwrap()).sum() + } +} + +#[derive(Clone)] +pub struct TempDir { + path: Box, + dir_info: Arc, + manager: Arc, +} + +impl TempDir { + pub fn new_file_with_size(&self, size: usize) -> Result> { + let path = self.path.join(GlobalUniqName::unique()).into_boxed_path(); + + if self.dir_info.limit < *self.dir_info.size.lock().unwrap() + size + || self.manager.global_limit < self.manager.group.lock().unwrap().size() + size + || self.manager.insufficient_disk(size as u64)? + { + return Ok(None); + } + + let mut dir_size = self.dir_info.size.lock().unwrap(); + if self.dir_info.limit < *dir_size + size { + return Ok(None); + } + + *dir_size += size; + drop(dir_size); + + self.init_dir()?; + + let dir_info = self.dir_info.clone(); + dir_info.count.fetch_add(1, Ordering::SeqCst); + + Ok(Some(TempPath(Arc::new(InnerPath { + path, + size, + dir_info, + })))) + } + + fn init_dir(&self) -> Result<()> { + let mut rt = Ok(()); + self.dir_info.inited.call_once(|| { + if let Err(e) = create_dir(&self.path) { + if !matches!(e.kind(), ErrorKind::AlreadyExists) { + rt = Err(e); + } + } + }); + Ok(rt?) + } +} + +struct DirInfo { + limit: usize, + count: AtomicUsize, + size: Mutex, + inited: Once, +} + +impl Debug for DirInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DirInfo") + .field("limit", &self.limit) + .field("count", &self.count) + .field("size", &self.size) + .field("inited", &self.inited.is_completed()) + .finish() + } +} + +#[derive(Clone)] +pub struct TempPath(Arc); + +impl Debug for TempPath { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TempPath") + .field("path", &self.0.path) + .field("size", &self.0.size) + .field("dir_info", &self.0.dir_info) + .finish() + } +} + +impl Hash for TempPath { + fn hash(&self, state: &mut H) { + self.0.path.hash(state); + } +} + +impl PartialEq for TempPath { + fn eq(&self, other: &Self) -> bool { + self.0.path == other.0.path + } +} + +impl Eq for TempPath {} + +impl AsRef for TempPath { + fn as_ref(&self) -> &Path { + self.0.path.as_ref() + } +} + +impl Deref for TempPath { + type Target = Path; + + fn deref(&self) -> &Path { + self.as_ref() + } +} + +impl TempPath { + pub fn size(&self) -> usize { + self.0.size + } +} + +struct InnerPath { + path: Box, + size: usize, + dir_info: Arc, +} + +impl Drop for InnerPath { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.path); + + self.dir_info.count.fetch_sub(1, Ordering::SeqCst); + let mut guard = self.dir_info.size.lock().unwrap(); + *guard -= self.size; + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::fs; + use std::sync::atomic::Ordering; + + use super::*; + + #[test] + fn test_temp_dir() -> Result<()> { + let thread = std::thread::current(); + GlobalInstance::init_testing(thread.name().unwrap()); + + let config = SpillConfig { + path: "test_data".to_string(), + reserved_disk_ratio: 0.01.into(), + global_bytes_limit: 1 << 30, + }; + + TempDirManager::init(&config, "test_tenant")?; + + let mgr = TempDirManager::instance(); + let dir = mgr.get_disk_spill_dir(1 << 30, "some_query").unwrap(); + let path = dir.new_file_with_size(100)?.unwrap(); + + println!("{:?}", &path); + + fs::write(&path, vec![b'a'; 100])?; + + assert_eq!(1, dir.dir_info.count.load(Ordering::Relaxed)); + assert_eq!(100, *dir.dir_info.size.lock().unwrap()); + + let path_str = path.as_ref().to_str().unwrap().to_string(); + drop(path); + + assert_eq!(0, dir.dir_info.count.load(Ordering::Relaxed)); + assert_eq!(0, *dir.dir_info.size.lock().unwrap()); + + assert_matches!(fs::read_to_string(path_str), Err(_)); + + mgr.drop_disk_spill_dir("some_query")?; + + remove_dir_all("test_data")?; + + Ok(()) + } + + #[test] + fn test_drop_disk_spill_dir_unknown() -> Result<()> { + let thread = std::thread::current(); + GlobalInstance::init_testing(thread.name().unwrap()); + + let config = SpillConfig { + path: "test_data2".to_string(), + reserved_disk_ratio: 0.99.into(), + global_bytes_limit: 1 << 30, + }; + + TempDirManager::init(&config, "test_tenant")?; + + let mgr = TempDirManager::instance(); + mgr.get_disk_spill_dir(1 << 30, "some_query").unwrap(); + + create_dir("test_data2/test_tenant/unknown_query1")?; + create_dir("test_data2/test_tenant/unknown_query2")?; + + let mut deleted = mgr.drop_disk_spill_dir_unknown(10)?; + + deleted.sort(); + + assert_eq!( + vec![ + PathBuf::from("test_data2/test_tenant/unknown_query1").into_boxed_path(), + PathBuf::from("test_data2/test_tenant/unknown_query2").into_boxed_path(), + ], + deleted + ); + + remove_dir_all("test_data2")?; + + Ok(()) + } +} diff --git a/tests/sqllogictests/suites/query/window_function/window_partition_spill.test b/tests/sqllogictests/suites/query/window_function/window_partition_spill.test index 2f268148905e..fccbd097486a 100644 --- a/tests/sqllogictests/suites/query/window_function/window_partition_spill.test +++ b/tests/sqllogictests/suites/query/window_function/window_partition_spill.test @@ -7,6 +7,9 @@ USE test_window_partition_spill statement ok set window_partition_spilling_bytes_threshold_per_proc = 1024 * 1024 * 1; +statement ok +set window_partition_spilling_to_disk_bytes_limit = 1024 * 1024 * 1024; + query T SELECT SUM(number + a + b) FROM ( diff --git a/tests/sqllogictests/suites/tpcds/spill.test b/tests/sqllogictests/suites/tpcds/spill.test index 9446d0209a66..366c85121b6d 100644 --- a/tests/sqllogictests/suites/tpcds/spill.test +++ b/tests/sqllogictests/suites/tpcds/spill.test @@ -23,6 +23,9 @@ set sort_spilling_bytes_threshold_per_proc = 1; statement ok set window_partition_spilling_memory_ratio = 1; +statement ok +set window_partition_spilling_to_disk_bytes_limit = 1024 * 1024 * 1024; + statement ok set window_partition_spilling_bytes_threshold_per_proc = 1; diff --git a/tests/sqllogictests/suites/tpch/spill.test b/tests/sqllogictests/suites/tpch/spill.test index 757154bbe84b..c393f2082b61 100644 --- a/tests/sqllogictests/suites/tpch/spill.test +++ b/tests/sqllogictests/suites/tpch/spill.test @@ -23,6 +23,9 @@ set sort_spilling_bytes_threshold_per_proc = 1; statement ok set window_partition_spilling_memory_ratio = 1; +statement ok +set window_partition_spilling_to_disk_bytes_limit = 1024 * 1024 * 1024; + statement ok set window_partition_spilling_bytes_threshold_per_proc = 1;