Skip to content

Commit

Permalink
Default to old way of external sorting in the absence of a LIMIT clause
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Aug 2, 2023
1 parent 39d6d16 commit dcba28f
Showing 1 changed file with 115 additions and 45 deletions.
160 changes: 115 additions & 45 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! It will do in-memory sorting if it has enough memory budget
//! but spills to disk if needed.

use crate::physical_plan::common::{batch_byte_size, IPCWriter};
use crate::physical_plan::common::{batch_byte_size, spawn_buffered, IPCWriter};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
Expand Down Expand Up @@ -79,15 +79,15 @@ impl ExternalSorterMetrics {
/// Sort arbitrary size of data to get a total order (may spill several times during sorting based on free memory available).
///
/// The basic architecture of the algorithm:
/// 1. get a non-empty new batch from input and sort it
/// 1. get a non-empty new batch from input
/// 2. check with the memory manager if we could buffer the batch in memory
/// 2.1 if memory sufficient, then buffer batch in memory, go to 1.
/// 2.2 if the memory threshold is reached, spill all buffered batches and to file.
/// 2.2 if the memory threshold is reached, sort all buffered batches and spill to file.
/// buffer the batch in memory, go to 1.
/// 3. when input is exhausted, merge all in memory batches and spills to get a total order.
struct ExternalSorter {
schema: SchemaRef,
in_mem_batches: Vec<RecordBatch>,
in_mem_batches: Vec<(bool, RecordBatch)>,
spills: Vec<NamedTempFile>,
/// Sort expressions
expr: Arc<[PhysicalSortExpr]>,
Expand Down Expand Up @@ -136,27 +136,46 @@ impl ExternalSorter {
return Ok(());
}

// Eagerly sort the batch to potentially reduce the number of rows
// after applying the fetch parameter; first perform a memory reservation
// for the sorting procedure.
let mut reservation =
MemoryConsumer::new(format!("insert_batch{}", self.partition_id))
.register(&self.runtime.memory_pool);

// TODO: This should probably be try_grow (#5885)
reservation.resize(input.get_array_memory_size());
// Maybe we should keep a single batch at all times and perform
// concatenate with the incoming batch + sort instead?
input = sort_batch(&input, &self.expr, self.fetch)?;
reservation.free();
let mut batch_sorted = false;
if self.fetch.map_or(false, |f| f < input.num_rows()) {
// Eagerly sort the batch to potentially reduce the number of rows
// after applying the fetch parameter; first perform a memory reservation
// for the sorting procedure.
let mut reservation =
MemoryConsumer::new(format!("insert_batch{}", self.partition_id))
.register(&self.runtime.memory_pool);

// TODO: This should probably be try_grow (#5885)
reservation.resize(input.get_array_memory_size());
// Maybe we should keep a single batch at all times and perform
// concatenate with the incoming batch + sort instead?
input = sort_batch(&input, &self.expr, self.fetch)?;
reservation.free();
batch_sorted = true;
}

let size = batch_byte_size(&input);
if self.reservation.try_grow(size).is_err() {
self.spill().await?;
self.reservation.try_grow(size)?
let before = self.reservation.size();
self.in_mem_sort().await?;
// Sorting may have freed memory, especially if fetch is not `None`
//
// As such we check again, and if the memory usage has dropped by
// a factor of 2, and we can allocate the necessary capacity,
// we don't spill
//
// The factor of 2 aims to avoid a degenerate case where the
// memory required for `fetch` is just under the memory available,
// causing repeated re-sorting of data
if self.reservation.size() > before / 2
|| self.reservation.try_grow(size).is_err()
{
self.spill().await?;
self.reservation.try_grow(size)?
}
}

self.in_mem_batches.push(input);
self.in_mem_batches.push((batch_sorted, input));
Ok(())
}

Expand All @@ -165,12 +184,12 @@ impl ExternalSorter {
}

/// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`.
fn merge(&mut self) -> Result<SendableRecordBatchStream> {
fn sort(&mut self) -> Result<SendableRecordBatchStream> {
if self.spilled_before() {
let mut streams = vec![];
if !self.in_mem_batches.is_empty() {
let in_mem_stream =
self.in_mem_stream(self.metrics.baseline.intermediate())?;
self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
streams.push(in_mem_stream);
}

Expand All @@ -188,7 +207,7 @@ impl ExternalSorter {
self.fetch,
)
} else if !self.in_mem_batches.is_empty() {
let result = self.in_mem_stream(self.metrics.baseline.clone());
let result = self.in_mem_sort_stream(self.metrics.baseline.clone());
// Report to the memory manager we are no longer using memory
self.reservation.free();
result
Expand Down Expand Up @@ -217,11 +236,13 @@ impl ExternalSorter {

debug!("Spilling sort data of ExternalSorter to disk whilst inserting");

self.in_mem_sort().await?;

let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?;

let batches = self.in_mem_stream(self.metrics.baseline.intermediate())?
.try_collect()
.await?;
let (sorted, batches): (Vec<bool>, Vec<RecordBatch>) =
std::mem::take(&mut self.in_mem_batches).into_iter().unzip();
assert_eq!(sorted.iter().all(|&s| s), true);

spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?;
let used = self.reservation.free();
Expand All @@ -231,15 +252,39 @@ impl ExternalSorter {
Ok(used)
}

/// Consumes in_mem_batches returning a stream
fn in_mem_stream(
/// Sorts the in_mem_batches in place
async fn in_mem_sort(&mut self) -> Result<()> {
if self.in_mem_batches.iter().all(|(sorted, _)| *sorted) {
return Ok(());
}

self.in_mem_batches = self
.in_mem_sort_stream(self.metrics.baseline.intermediate())?
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|batch| (true, batch))
.collect();

let size: usize = self
.in_mem_batches
.iter()
.map(|(_, x)| x.get_array_memory_size())
.sum();

self.reservation.resize(size);
Ok(())
}

/// Consumes in_mem_batches returning a sorted stream
fn in_mem_sort_stream(
&mut self,
metrics: BaselineMetrics,
) -> Result<SendableRecordBatchStream> {
assert_ne!(self.in_mem_batches.len(), 0);
if self.in_mem_batches.len() == 1 {
let batch = self.in_mem_batches.remove(0);
let stream = self.sort_batch_stream(batch, metrics)?;
let (sorted, batch) = self.in_mem_batches.remove(0);
let stream = self.sort_batch_stream(batch, sorted, metrics)?;
self.in_mem_batches.clear();
return Ok(stream);
}
Expand All @@ -249,17 +294,22 @@ impl ExternalSorter {
// This is a very rough heuristic and likely could be refined further
if self.reservation.size() < 1048576 {
// Concatenate memory batches together and sort
let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
let (sorted, batches): (Vec<bool>, Vec<RecordBatch>) =
std::mem::take(&mut self.in_mem_batches).into_iter().unzip();
let batch = concat_batches(&self.schema, &batches)?;
let sorted = sorted.iter().all(|&s| s);
self.in_mem_batches.clear();
let output = sort_batch(&batch, &self.expr, self.fetch)?;
return self.sort_batch_stream(output, metrics);
return self.sort_batch_stream(batch, sorted, metrics);
}

let streams = std::mem::take(&mut self.in_mem_batches)
.into_iter()
.map(|batch| {
.map(|(sorted, batch)| {
let metrics = self.metrics.baseline.intermediate();
Ok(self.sort_batch_stream(batch, metrics)?)
Ok(spawn_buffered(
self.sort_batch_stream(batch, sorted, metrics)?,
1,
))
})
.collect::<Result<_>>()?;

Expand All @@ -276,14 +326,34 @@ impl ExternalSorter {
fn sort_batch_stream(
&self,
batch: RecordBatch,
sorted: bool,
metrics: BaselineMetrics,
) -> Result<SendableRecordBatchStream> {
let schema = batch.schema();
let stream = futures::stream::once(futures::future::lazy(move |_| {
metrics.record_output(batch.num_rows());
Ok(batch)
}));
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))

if !sorted {
// Reserve some memory for sorting the batch
let mut reservation =
MemoryConsumer::new(format!("sort_batch_stream{}", self.partition_id))
.register(&self.runtime.memory_pool);

// TODO: This should probably be try_grow (#5885)
reservation.resize(batch.get_array_memory_size());

let fetch = self.fetch;
let expressions = self.expr.clone();
let stream = futures::stream::once(futures::future::lazy(move |_| {
let output = sort_batch(&batch, &expressions, fetch)?;
metrics.record_output(output.num_rows());
drop(batch);
reservation.free();
Ok(output)
}));
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
} else {
let stream = futures::stream::once(futures::future::lazy(move |_| Ok(batch)));
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
}
}

Expand Down Expand Up @@ -424,8 +494,8 @@ impl SortExec {
/// Create a new sort execution plan with the option to preserve
/// the partitioning of the input plan
#[deprecated(
since = "22.0.0",
note = "use `new`, `with_fetch` and `with_preserve_partioning` instead"
since = "22.0.0",
note = "use `new`, `with_fetch` and `with_preserve_partioning` instead"
)]
pub fn new_with_partitioning(
expr: Vec<PhysicalSortExpr>,
Expand Down Expand Up @@ -593,9 +663,9 @@ impl ExecutionPlan for SortExec {
let batch = batch?;
sorter.insert_batch(batch).await?;
}
sorter.merge()
sorter.sort()
})
.try_flatten(),
.try_flatten(),
)))
}

Expand Down Expand Up @@ -799,7 +869,7 @@ mod tests {
],
Arc::new(CoalescePartitionsExec::new(csv)),
)
.with_fetch(fetch),
.with_fetch(fetch),
);

let task_ctx = session_ctx.task_ctx();
Expand Down

0 comments on commit dcba28f

Please sign in to comment.