Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: Fix memory reservation and allocation problems for SortExec #14644

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ async fn test_sort_10k_mem() {
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_100k_mem() {
alamb marked this conversation as resolved.
Show resolved Hide resolved
for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, true)] {
for (batch_size, should_spill) in
[(5, false), (10000, false), (20000, true), (1000000, true)]
{
SortTest::new()
.with_int32_batches(batch_size)
.with_pool_size(100 * KB)
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn oom_sort() {
.with_expected_errors(vec![
"Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)",
])
.with_memory_limit(200_000)
.with_memory_limit(500_000)
.run()
.await
}
Expand Down Expand Up @@ -271,7 +271,8 @@ async fn sort_spill_reservation() {

// Merge operation needs extra memory to do row conversion, so make the
// memory limit larger.
let mem_limit = partition_size * 2;
let mem_limit =
((partition_size * 2 + 1024) as f64 / MEMORY_FRACTION).ceil() as usize;
let test = TestCase::new()
// This query uses a different order than the input table to
// force a sort. It also needs to have multiple columns to
Expand Down Expand Up @@ -308,7 +309,8 @@ async fn sort_spill_reservation() {

test.clone()
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:",
"bytes for ExternalSorterMerge",
])
.with_config(config)
.run()
Expand Down
22 changes: 21 additions & 1 deletion datafusion/physical-plan/src/sorts/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,22 @@ pub struct ArrayValues<T: CursorValues> {
// Otherwise, the first null index
null_threshold: usize,
options: SortOptions,

/// Tracks the memory used by the values array,
/// freed on drop.
_reservation: MemoryReservation,
}

impl<T: CursorValues> ArrayValues<T> {
/// Create a new [`ArrayValues`] from the provided `values` sorted according
/// to `options`.
///
/// Panics if the array is empty
pub fn new<A: CursorArray<Values = T>>(options: SortOptions, array: &A) -> Self {
pub fn new<A: CursorArray<Values = T>>(
options: SortOptions,
array: &A,
reservation: MemoryReservation,
) -> Self {
assert!(array.len() > 0, "Empty array passed to FieldCursor");
let null_threshold = match options.nulls_first {
true => array.null_count(),
Expand All @@ -309,6 +317,7 @@ impl<T: CursorValues> ArrayValues<T> {
values: array.values(),
null_threshold,
options,
_reservation: reservation,
}
}

Expand Down Expand Up @@ -360,6 +369,12 @@ impl<T: CursorValues> CursorValues for ArrayValues<T> {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use datafusion_execution::memory_pool::{
GreedyMemoryPool, MemoryConsumer, MemoryPool,
};

use super::*;

fn new_primitive(
Expand All @@ -372,10 +387,15 @@ mod tests {
false => values.len() - null_count,
};

let memory_pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(10000));
let consumer = MemoryConsumer::new("test");
let reservation = consumer.register(&memory_pool);

let values = ArrayValues {
values: PrimitiveValues(values),
null_threshold,
options,
_reservation: reservation,
};

Cursor::new(values)
Expand Down
Loading
Loading