Skip to content
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e1e1d09
Bring over apache/arrow-rs/9683, integrate into sorts, add heuristic …
mbutrovich Apr 9, 2026
9371dd1
Merge branch 'main' into arrow_rs_9683
mbutrovich Apr 13, 2026
fc35b08
Stash with implementation, need to fix accounting for one test.
mbutrovich Apr 13, 2026
d822a50
Fix more tests.
mbutrovich Apr 13, 2026
54be475
Tests pass.
mbutrovich Apr 13, 2026
a42ba49
Cleanup.
mbutrovich Apr 13, 2026
24dbdb7
More cleanup.
mbutrovich Apr 13, 2026
e83ffc6
More tests.
mbutrovich Apr 13, 2026
60fbdfb
More tests.
mbutrovich Apr 13, 2026
8e8f774
Cleanup before pushing.
mbutrovich Apr 13, 2026
aa93a6e
Fix CI failures.
mbutrovich Apr 13, 2026
af514fe
Fix configs.md.
mbutrovich Apr 13, 2026
c527553
Fix CI failures.
mbutrovich Apr 13, 2026
83860c0
Avoid radix sort for decimal types for now.
mbutrovich Apr 14, 2026
a88eacf
Fix information_schema.slt test.
mbutrovich Apr 14, 2026
175dcf6
Update to latest radix kernel from arrow-rs PR.
mbutrovich Apr 14, 2026
4bdf871
Use lexsort for single columns, radix otherwise. Should help with Q11…
mbutrovich Apr 14, 2026
a1f0193
Address some PR feedback.
mbutrovich Apr 14, 2026
bbda50f
Address some PR feedback.
mbutrovich Apr 14, 2026
2698b85
Update failing test.
mbutrovich Apr 14, 2026
d5d3ef7
Update failing test for realsies.
mbutrovich Apr 14, 2026
e0fb0a8
## `datafusion/physical-plan/src/sorts/sort.rs`
mbutrovich Apr 14, 2026
73bb06b
Cleanup.
mbutrovich Apr 14, 2026
7a75c38
Fix copy.slt test to show new metrics.
mbutrovich Apr 14, 2026
482e72c
Temporarily default radix to false to get benchmarks.
mbutrovich Apr 14, 2026
96dd0df
Revert "Temporarily default radix to false to get benchmarks."
mbutrovich Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,22 @@ config_namespace! {
/// When sorting, below what size should data be concatenated
/// and sorted in a single RecordBatch rather than sorted in
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
///
/// Deprecated: this option is no longer used. The sort pipeline
/// now always coalesces batches before sorting. Use
/// `sort_coalesce_target_rows` instead.
pub sort_in_place_threshold_bytes: usize, warn = "`sort_in_place_threshold_bytes` is deprecated and ignored. Use `sort_coalesce_target_rows` instead.", default = 1024 * 1024

/// Target number of rows to coalesce before sorting in ExternalSorter.
///
/// Larger values give radix sort (used for primitives and strings)
/// enough rows to amortize RowConverter encoding overhead. Under
/// memory pressure the actual chunk size may be smaller.
///
/// For schemas that are not eligible for radix sort (all-dictionary
/// or nested types), the coalesce target falls back to `batch_size`
/// regardless of this setting.
pub sort_coalesce_target_rows: usize, default = 32768

/// Maximum buffer capacity (in bytes) per partition for BufferExec
/// inserted during sort pushdown optimization.
Expand Down
18 changes: 1 addition & 17 deletions datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,17 +497,6 @@ impl SortFuzzerTestGenerator {
..=(per_partition_mem_limit as f64 * 0.3) as usize,
);

// 1 to 3 times of the approx batch size. Setting this to a very large nvalue
// will cause external sort to fail.
let sort_in_place_threshold_bytes = if with_memory_limit {
// For memory-limited query, setting `sort_in_place_threshold_bytes` too
// large will cause failure.
0
} else {
let dataset_size = self.dataset_state.as_ref().unwrap().dataset_size;
rng.random_range(0..=dataset_size * 2_usize)
};

// Set up strings for printing
let memory_limit_str = if with_memory_limit {
human_readable_size(memory_limit)
Expand All @@ -530,16 +519,11 @@ impl SortFuzzerTestGenerator {
" Sort spill reservation bytes: {}",
human_readable_size(sort_spill_reservation_bytes)
);
println!(
" Sort in place threshold bytes: {}",
human_readable_size(sort_in_place_threshold_bytes)
);

let config = SessionConfig::new()
.with_target_partitions(num_partitions)
.with_batch_size(init_state.approx_batch_num_rows / 2)
.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes)
.with_sort_in_place_threshold_bytes(sort_in_place_threshold_bytes);
.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes);

let memory_pool: Arc<dyn MemoryPool> = if with_memory_limit {
Arc::new(FairSpillPool::new(memory_limit))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ async fn test_sort_with_limited_memory() -> Result<()> {
})
.await?;

let total_spill_files_size = spill_count * record_batch_size;
// The chunked sort pipeline is more memory-efficient (shrinks
// reservations after sorting), so total spill size may be less than
// pool size. Just verify that spilling occurred.
assert!(
total_spill_files_size > pool_size,
"Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}",
spill_count > 0,
"Expected spilling under memory pressure, but spill_count was 0",
);

Ok(())
Expand Down
24 changes: 7 additions & 17 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,7 @@ async fn sort_spill_reservation() {
let scenario = Scenario::new_dictionary_strings(1);
let partition_size = scenario.partition_size();

let base_config = SessionConfig::new()
// do not allow the sort to use the 'concat in place' path
.with_sort_in_place_threshold_bytes(10);
let base_config = SessionConfig::new();

// This test case shows how sort_spill_reservation works by
// purposely sorting data that requires non trivial memory to
Expand Down Expand Up @@ -313,26 +311,19 @@ async fn sort_spill_reservation() {
]
);

let config = base_config
.clone()
// provide insufficient reserved space for merging,
// the sort will fail while trying to merge
.with_sort_spill_reservation_bytes(1024);
// With low reservation, the sort should still succeed because
// the chunked sort pipeline eagerly sorts and the multi-level merge
// handles low merge memory by reducing fan-in.
let config = base_config.clone().with_sort_spill_reservation_bytes(1024);

test.clone()
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:",
"B for ExternalSorterMerge",
])
.with_expected_success()
.with_config(config)
.run()
.await;

let config = base_config
// reserve sufficient space up front for merge and this time,
// which will force the spills to happen with less buffered
// input and thus with enough to merge.
// reserve sufficient space up front for merge
.with_sort_spill_reservation_bytes(mem_limit / 2);

test.with_config(config).with_expected_success().run().await;
Expand Down Expand Up @@ -583,7 +574,6 @@ async fn setup_context(

let config = SessionConfig::new()
.with_sort_spill_reservation_bytes(64 * 1024) // 256KB
.with_sort_in_place_threshold_bytes(0)
.with_spill_compression(spill_compression)
.with_batch_size(64) // To reduce test memory usage
.with_target_partitions(1);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async fn test_invalid_memory_limit_when_limit_is_not_numeric() {
async fn test_max_temp_directory_size_enforcement() {
let ctx = SessionContext::new();

ctx.sql("SET datafusion.runtime.memory_limit = '1M'")
ctx.sql("SET datafusion.runtime.memory_limit = '256K'")
.await
.unwrap()
.collect()
Expand Down
24 changes: 24 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,15 @@ impl SessionConfig {
/// Set the size of [`sort_in_place_threshold_bytes`] to control
/// how sort does things.
///
/// Deprecated: this option is no longer used. Use
/// [`with_sort_coalesce_target_rows`] instead.
///
/// [`sort_in_place_threshold_bytes`]: datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes
/// [`with_sort_coalesce_target_rows`]: Self::with_sort_coalesce_target_rows
#[deprecated(
since = "46.0.0",
note = "No longer used. Sort pipeline now coalesces batches before sorting. Use with_sort_coalesce_target_rows instead."
)]
pub fn with_sort_in_place_threshold_bytes(
mut self,
sort_in_place_threshold_bytes: usize,
Expand All @@ -465,6 +473,22 @@ impl SessionConfig {
self
}

/// Set the target number of rows to coalesce before sorting.
///
/// Larger values give radix sort enough rows to amortize encoding
/// overhead (2-3x faster at 32K+ rows). Under memory pressure the
/// actual chunk size may be smaller.
///
/// [`sort_coalesce_target_rows`]: datafusion_common::config::ExecutionOptions::sort_coalesce_target_rows
pub fn with_sort_coalesce_target_rows(
mut self,
sort_coalesce_target_rows: usize,
) -> Self {
self.options_mut().execution.sort_coalesce_target_rows =
sort_coalesce_target_rows;
self
}

/// Enables or disables the enforcement of batch size in joins
pub fn with_enforce_batch_size_in_joins(
mut self,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,7 @@ impl GroupedHashAggregateStream {
emit,
self.spill_state.spill_expr.clone(),
self.batch_size,
false,
);
let spillfile = self
.spill_state
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/sorts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod cursor;
mod merge;
mod multi_level_merge;
pub mod partial_sort;
mod radix;
pub mod sort;
pub mod sort_preserving_merge;
mod stream;
Expand Down
185 changes: 185 additions & 0 deletions datafusion/physical-plan/src/sorts/radix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// TODO: replace with arrow_row::radix::radix_sort_to_indices once
// available in arrow-rs (see https://github.com/apache/arrow-rs/pull/9683)

//! MSD radix sort on row-encoded keys.

use arrow::array::UInt32Array;
use arrow::row::{RowConverter, Rows, SortField};
use arrow_ord::sort::SortColumn;
use std::sync::Arc;

/// Buckets smaller than this fall back to comparison sort.
const FALLBACK_THRESHOLD: usize = 32;

/// Maximum number of radix passes before falling back to comparison sort.
const MAX_DEPTH: usize = 8;

/// Sort row indices using MSD radix sort on row-encoded keys.
///
/// Returns a `UInt32Array` of row indices in sorted order.
pub(crate) fn radix_sort_to_indices(
sort_columns: &[SortColumn],
) -> arrow::error::Result<UInt32Array> {
let sort_fields: Vec<SortField> = sort_columns
.iter()
.map(|col| {
SortField::new_with_options(
col.values.data_type().clone(),
col.options.unwrap_or_default(),
)
})
.collect();

let arrays: Vec<_> = sort_columns
.iter()
.map(|col| Arc::clone(&col.values))
.collect();

let converter = RowConverter::new(sort_fields)?;
let rows = converter.convert_columns(&arrays)?;

let n = rows.num_rows();
let mut indices: Vec<u32> = (0..n as u32).collect();
let mut temp = vec![0u32; n];
let mut bytes = vec![0u8; n];
msd_radix_sort(&mut indices, &mut temp, &mut bytes, &rows, 0, true);
Ok(UInt32Array::from(indices))
}

/// The byte at `offset` in the row, or 0 if past the end.
///
/// Inline helper until `Row::byte_from` is available in released arrow-row.
#[inline(always)]
unsafe fn row_byte(rows: &Rows, idx: u32, byte_pos: usize) -> u8 {
let row = unsafe { rows.row_unchecked(idx as usize) };
let data = row.data();
if byte_pos < data.len() {
unsafe { *data.get_unchecked(byte_pos) }
} else {
0
}
}

/// Row data starting at `offset`, or empty slice if past the end.
///
/// Inline helper until `Row::data_from` is available in released arrow-row.
#[inline(always)]
unsafe fn row_data_from(rows: &Rows, idx: u32, byte_pos: usize) -> &[u8] {
let row = unsafe { rows.row_unchecked(idx as usize) };
let data = row.data();
if byte_pos <= data.len() {
unsafe { data.get_unchecked(byte_pos..) }
} else {
&[]
}
}

/// MSD radix sort using ping-pong buffers.
///
/// Each level scatters from `src` into `dst`, then recurses with the
/// roles swapped (dst becomes the next level's src). This avoids an
/// O(n) `copy_from_slice` at every recursion level.
///
/// `result_in_src` tracks where the caller expects the sorted output:
/// true means `src`, false means `dst`.
fn msd_radix_sort(
src: &mut [u32],
dst: &mut [u32],
bytes: &mut [u8],
rows: &Rows,
byte_pos: usize,
result_in_src: bool,
) {
let n = src.len();

if n <= FALLBACK_THRESHOLD || byte_pos >= MAX_DEPTH {
// Compare only from byte_pos onward — earlier bytes are identical
// within this bucket, having already been discriminated by prior
// radix passes.
if result_in_src {
src.sort_unstable_by(|&a, &b| {
let ra = unsafe { row_data_from(rows, a, byte_pos) };
let rb = unsafe { row_data_from(rows, b, byte_pos) };
ra.cmp(rb)
});
} else {
dst.copy_from_slice(src);
dst.sort_unstable_by(|&a, &b| {
let ra = unsafe { row_data_from(rows, a, byte_pos) };
let rb = unsafe { row_data_from(rows, b, byte_pos) };
ra.cmp(rb)
});
}
return;
}

// Extract bytes and build histogram in one pass. The bytes buffer
// avoids chasing pointers through Rows a second time during scatter.
let bytes = &mut bytes[..n];
let mut counts = [0u32; 256];
for (i, &idx) in src.iter().enumerate() {
let b = unsafe { row_byte(rows, idx, byte_pos) };
bytes[i] = b;
counts[b as usize] += 1;
}

let mut offsets = [0u32; 257];
let mut num_buckets = 0u32;
for i in 0..256 {
num_buckets += (counts[i] > 0) as u32;
offsets[i + 1] = offsets[i] + counts[i];
}

// All rows share the same byte — no scatter needed, roles unchanged.
if num_buckets == 1 {
msd_radix_sort(src, dst, bytes, rows, byte_pos + 1, result_in_src);
return;
}

// Scatter src → dst using the pre-extracted bytes
let mut write_pos = offsets;
for (i, &idx) in src.iter().enumerate() {
let b = bytes[i] as usize;
dst[write_pos[b] as usize] = idx;
write_pos[b] += 1;
}

// Recurse with roles swapped: after scatter the data lives in dst,
// so dst becomes the next level's src.
for bucket in 0..256 {
let start = offsets[bucket] as usize;
let end = offsets[bucket + 1] as usize;
let len = end - start;
if len > 1 {
msd_radix_sort(
&mut dst[start..end],
&mut src[start..end],
&mut bytes[start..end],
rows,
byte_pos + 1,
!result_in_src,
);
} else if len == 1 && result_in_src {
// Single-element bucket: after scatter it's in dst, copy back
// if the caller expects the result in src.
src[start] = dst[start];
}
}
}
Loading
Loading