Skip to content

Commit

Permalink
Fix the edge case of sorting multiple batches with less than 1MB of c…
Browse files Browse the repository at this point in the history
…ombined size
  • Loading branch information
gruuya committed Aug 3, 2023
1 parent dcba28f commit bcf1318
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ impl ExternalSorter {

// TODO: This should probably be try_grow (#5885)
reservation.resize(input.get_array_memory_size());
// Maybe we should keep a single batch at all times and perform
// concatenate with the incoming batch + sort instead?
// Maybe we should perform sorting in a parallel task to unblock the caller
input = sort_batch(&input, &self.expr, self.fetch)?;
reservation.free();
batch_sorted = true;
Expand Down Expand Up @@ -242,7 +241,7 @@ impl ExternalSorter {

let (sorted, batches): (Vec<bool>, Vec<RecordBatch>) =
std::mem::take(&mut self.in_mem_batches).into_iter().unzip();
assert_eq!(sorted.iter().all(|&s| s), true);
assert!(sorted.iter().all(|&s| s));

spill_sorted_batches(batches, spillfile.path(), self.schema.clone()).await?;
let used = self.reservation.free();
Expand Down Expand Up @@ -294,12 +293,13 @@ impl ExternalSorter {
// This is a very rough heuristic and likely could be refined further
if self.reservation.size() < 1048576 {
// Concatenate memory batches together and sort
let (sorted, batches): (Vec<bool>, Vec<RecordBatch>) =
let (_, batches): (Vec<bool>, Vec<RecordBatch>) =
std::mem::take(&mut self.in_mem_batches).into_iter().unzip();
let batch = concat_batches(&self.schema, &batches)?;
let sorted = sorted.iter().all(|&s| s);
self.in_mem_batches.clear();
return self.sort_batch_stream(batch, sorted, metrics);
// Even if all individual batches were themselves sorted the resulting concatenated one
// isn't guaranteed to be sorted, so we must perform sorting on the stream.
return self.sort_batch_stream(batch, false, metrics);
}

let streams = std::mem::take(&mut self.in_mem_batches)
Expand Down

0 comments on commit bcf1318

Please sign in to comment.