Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: Fix memory reservation and allocation problems for SortExec #14644

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 196 additions & 18 deletions datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::sync::Arc;

use arrow::{
array::{ArrayRef, Int32Array},
array::{as_string_array, ArrayRef, Int32Array, StringArray},
compute::SortOptions,
record_batch::RecordBatch,
};
Expand All @@ -29,6 +29,7 @@ use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::cast::as_int32_array;
use datafusion_execution::memory_pool::GreedyMemoryPool;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
Expand All @@ -42,42 +43,139 @@ const KB: usize = 1 << 10;
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_10k_mem() {
for (batch_size, should_spill) in [(5, false), (20000, true), (500000, true)] {
SortTest::new()
let (input, collected) = SortTest::new()
.with_int32_batches(batch_size)
.with_sort_columns(vec!["x"])
.with_pool_size(10 * KB)
.with_should_spill(should_spill)
.run()
.await;

let expected = partitions_to_sorted_vec(&input);
let actual = batches_to_vec(&collected);
assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}");
}
}

#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_100k_mem() {
alamb marked this conversation as resolved.
Show resolved Hide resolved
for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, true)] {
SortTest::new()
for (batch_size, should_spill) in
[(5, false), (10000, false), (20000, true), (1000000, true)]
{
let (input, collected) = SortTest::new()
.with_int32_batches(batch_size)
.with_sort_columns(vec!["x"])
.with_pool_size(100 * KB)
.with_should_spill(should_spill)
.run()
.await;

let expected = partitions_to_sorted_vec(&input);
let actual = batches_to_vec(&collected);
assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}");
}
}

#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_strings_100k_mem() {
for (batch_size, should_spill) in
[(5, false), (1000, false), (10000, true), (20000, true)]
{
let (input, collected) = SortTest::new()
.with_utf8_batches(batch_size)
.with_sort_columns(vec!["x"])
.with_pool_size(100 * KB)
.with_should_spill(should_spill)
.run()
.await;

let mut input = input
.iter()
.flat_map(|p| p.iter())
.flat_map(|b| {
let array = b.column(0);
as_string_array(array)
.iter()
.map(|s| s.unwrap().to_string())
})
.collect::<Vec<String>>();
input.sort_unstable();
let actual = collected
.iter()
.flat_map(|b| {
let array = b.column(0);
as_string_array(array)
.iter()
.map(|s| s.unwrap().to_string())
})
.collect::<Vec<String>>();
assert_eq!(input, actual);
}
}

#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_multi_columns_100k_mem() {
for (batch_size, should_spill) in
[(5, false), (1000, false), (10000, true), (20000, true)]
{
let (input, collected) = SortTest::new()
.with_int32_utf8_batches(batch_size)
.with_sort_columns(vec!["x", "y"])
.with_pool_size(100 * KB)
.with_should_spill(should_spill)
.run()
.await;

fn record_batch_to_vec(b: &RecordBatch) -> Vec<(i32, String)> {
let mut rows: Vec<_> = Vec::new();
let i32_array = as_int32_array(b.column(0)).unwrap();
let string_array = as_string_array(b.column(1));
for i in 0..b.num_rows() {
let str = string_array.value(i).to_string();
let i32 = i32_array.value(i);
rows.push((i32, str));
}
rows
}
let mut input = input
.iter()
.flat_map(|p| p.iter())
.flat_map(record_batch_to_vec)
.collect::<Vec<(i32, String)>>();
input.sort_unstable();
let actual = collected
.iter()
.flat_map(record_batch_to_vec)
.collect::<Vec<(i32, String)>>();
assert_eq!(input, actual);
}
}

#[tokio::test]
async fn test_sort_unlimited_mem() {
for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, false)] {
SortTest::new()
let (input, collected) = SortTest::new()
.with_int32_batches(batch_size)
.with_sort_columns(vec!["x"])
.with_pool_size(usize::MAX)
.with_should_spill(should_spill)
.run()
.await;

let expected = partitions_to_sorted_vec(&input);
let actual = batches_to_vec(&collected);
assert_eq!(expected, actual, "failure in @ batch_size {batch_size:?}");
}
}

#[derive(Debug, Default)]
struct SortTest {
input: Vec<Vec<RecordBatch>>,
/// The names of the columns to sort by
sort_columns: Vec<String>,
/// GreedyMemoryPool size, if specified
pool_size: Option<usize>,
/// If true, expect the sort to spill
Expand All @@ -89,12 +187,29 @@ impl SortTest {
Default::default()
}

fn with_sort_columns(mut self, sort_columns: Vec<&str>) -> Self {
self.sort_columns = sort_columns.iter().map(|s| s.to_string()).collect();
self
}

/// Create batches of int32 values of rows
fn with_int32_batches(mut self, rows: usize) -> Self {
self.input = vec![make_staggered_i32_batches(rows)];
self
}

/// Create batches of utf8 values of rows
fn with_utf8_batches(mut self, rows: usize) -> Self {
self.input = vec![make_staggered_utf8_batches(rows)];
self
}

/// Create batches of int32 and utf8 values of rows
fn with_int32_utf8_batches(mut self, rows: usize) -> Self {
self.input = vec![make_staggered_i32_utf8_batches(rows)];
self
}

/// specify that this test should use a memory pool of the specified size
fn with_pool_size(mut self, pool_size: usize) -> Self {
self.pool_size = Some(pool_size);
Expand All @@ -108,7 +223,7 @@ impl SortTest {

/// Sort the input using SortExec and ensure the results are
/// correct according to `Vec::sort` both with and without spilling
async fn run(&self) {
async fn run(&self) -> (Vec<Vec<RecordBatch>>, Vec<RecordBatch>) {
let input = self.input.clone();
let first_batch = input
.iter()
Expand All @@ -117,16 +232,21 @@ impl SortTest {
.expect("at least one batch");
let schema = first_batch.schema();

let sort = LexOrdering::new(vec![PhysicalSortExpr {
expr: col("x", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: true,
},
}]);
let sort_ordering = LexOrdering::new(
self.sort_columns
.iter()
.map(|c| PhysicalSortExpr {
expr: col(c, &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: true,
},
})
.collect(),
);

let exec = MemorySourceConfig::try_new_exec(&input, schema, None).unwrap();
let sort = Arc::new(SortExec::new(sort, exec));
let sort = Arc::new(SortExec::new(sort_ordering, exec));

let session_config = SessionConfig::new();
let session_ctx = if let Some(pool_size) = self.pool_size {
Expand All @@ -151,9 +271,6 @@ impl SortTest {
let task_ctx = session_ctx.task_ctx();
let collected = collect(sort.clone(), task_ctx).await.unwrap();

let expected = partitions_to_sorted_vec(&input);
let actual = batches_to_vec(&collected);

if self.should_spill {
assert_ne!(
sort.metrics().unwrap().spill_count().unwrap(),
Expand All @@ -173,7 +290,8 @@ impl SortTest {
0,
"The sort should have returned all memory used back to the memory pool"
);
assert_eq!(expected, actual, "failure in @ pool_size {self:?}");

(input, collected)
}
}

Expand Down Expand Up @@ -201,3 +319,63 @@ fn make_staggered_i32_batches(len: usize) -> Vec<RecordBatch> {
}
batches
}

/// Return randomly sized record batches in a field named 'x' of type `Utf8`
/// with randomized content
fn make_staggered_utf8_batches(len: usize) -> Vec<RecordBatch> {
let mut rng = rand::thread_rng();
let max_batch = 1024;

let mut batches = vec![];
let mut remaining = len;
while remaining != 0 {
let to_read = rng.gen_range(0..=remaining.min(max_batch));
remaining -= to_read;

batches.push(
RecordBatch::try_from_iter(vec![(
"x",
Arc::new(StringArray::from_iter_values(
(0..to_read).map(|_| format!("test_string_{}", rng.gen::<u32>())),
)) as ArrayRef,
)])
.unwrap(),
)
}
batches
}

/// Return randomly sized record batches in a field named 'x' of type `Int32`
/// with randomized i32 content and a field named 'y' of type `Utf8`
/// with randomized content
fn make_staggered_i32_utf8_batches(len: usize) -> Vec<RecordBatch> {
let mut rng = rand::thread_rng();
let max_batch = 1024;

let mut batches = vec![];
let mut remaining = len;
while remaining != 0 {
let to_read = rng.gen_range(0..=remaining.min(max_batch));
remaining -= to_read;

batches.push(
RecordBatch::try_from_iter(vec![
(
"x",
Arc::new(Int32Array::from_iter_values(
(0..to_read).map(|_| rng.gen()),
)) as ArrayRef,
),
(
"y",
Arc::new(StringArray::from_iter_values(
(0..to_read).map(|_| format!("test_string_{}", rng.gen::<u32>())),
)) as ArrayRef,
),
])
.unwrap(),
)
}

batches
}
8 changes: 5 additions & 3 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn oom_sort() {
.with_expected_errors(vec![
"Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)",
])
.with_memory_limit(200_000)
.with_memory_limit(500_000)
.run()
.await
}
Expand Down Expand Up @@ -271,7 +271,8 @@ async fn sort_spill_reservation() {

// Merge operation needs extra memory to do row conversion, so make the
// memory limit larger.
let mem_limit = partition_size * 2;
let mem_limit =
((partition_size * 2 + 1024) as f64 / MEMORY_FRACTION).ceil() as usize;
let test = TestCase::new()
// This query uses a different order than the input table to
// force a sort. It also needs to have multiple columns to
Expand Down Expand Up @@ -308,7 +309,8 @@ async fn sort_spill_reservation() {

test.clone()
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge",
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:",
"bytes for ExternalSorterMerge",
])
.with_config(config)
.run()
Expand Down
Loading
Loading