Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: Fix memory reservation and allocation problems for SortExec #14644

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

Kontinuation
Copy link
Member

@Kontinuation Kontinuation commented Feb 13, 2025

Which issue does this PR close?

Rationale for this change

I had a hard time making DataFusion Comet work on cloud instances with 4GB memory per CPU core, partially because DataFusion is very likely to allocates more memory than reserved and run into OOM, or run into various kinds of memory reservation failures. In cases when the partition to process is larger than available memory, we expect spilling to happen to run the query to completion, but got tons of failures instead. We found operators involving SortExec such as sort-merge join triggers the aforementioned problems frequently.

#10073 reports that SortExec may allocate 2X memory than it reserves (see "the second problem" in the issue), and we found that it contributed to most of the OOM cases we encountered when using Comet. We have also found several other problems related to SortExec that are critical for our memory-limited use cases, and this PR tries to accommodate some of them.

What changes are included in this PR?

This PR contains several fixes:

  1. Don't try_collect the result of merging all at once. We consume the merged batches one after another and reserve memory for each batch. Once the reservation fails we switch to "spill mode" and write all future batches into the spill file. This resolves the 2X memory allocation problem ("the second problem") reported by Memory account not adding up in SortExec #10073, as well as this comment: External sorting not working for (maybe only for string columns??) #12136 (comment)
  2. shrink_to_fit every sorted batches reduce the memory footprint of sorted batches, otherwise sorted string arrays may take 2X the original space in the worst case, due to exponential growth of MutableBuffer for storing variable length binary values. shrink_to_fit is a no-op for primitive-type columns returned by take_arrays since they already have the right capacity, and benchmarking showed no significant performance regression for non-primitive types such as string arrays, so I think it is a good change. This resolves "the first problem" reported by Memory account not adding up in SortExec #10073.
  3. Reserves more memory for ingested batches to leave some room for merging. This PR reserves 2X memory for each batch, this works for most of the queries in sort-tpch benchmark (all except Q5 and Q8). User still have to configure sort_spill_reservation_bytes when memory reserved is not big enough for merging. I don't think it is a good change but this is the only solution I can think of to compensate for the extra memory usage for the row representation of sorted columns.

The problems with SortExec are not easy to solve without introducing significant changes to the overall design, 3) of this PR is mostly a bandaid solutions. I believe that the implementation needs to be revamped to make all the memory reservation/spilling behave correctly.

Are these changes tested?

Yes. It passes all the tests.

Are there any user-facing changes?

Uses may find that sort operator is more likely to spill when running with memory constraints. The old configurations they had to make sort operator work may not be optimal after applying this PR. For instance, user may configure a super large sort_spill_reservation_bytes to make merging work, but this PR reduces the optimal value of sort_spill_reservation_bytes for the same workload.

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate labels Feb 13, 2025
Fix batch memory consumption growth after sorting;
Reserve memory more aggressively to compensate for memory needed for merging.
@Kontinuation Kontinuation force-pushed the pr-fix-sort-mem-reservation-and-usage branch from 2f7f403 to 8cc9aea Compare February 13, 2025 13:18
Comment on lines 191 to +201
let value = self.sort.expr.evaluate(batch)?;
let array = value.into_array(batch.num_rows())?;
let size_in_mem = array.get_buffer_memory_size();
let array = array.as_any().downcast_ref::<T>().expect("field values");
Ok(ArrayValues::new(self.sort.options, array))
let mut array_reservation = self.reservation.new_empty();
array_reservation.try_grow(size_in_mem)?;
Ok(ArrayValues::new(
self.sort.options,
array,
array_reservation,
))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this reservation is needed.

When the sort expression is a simple column reference, the array simply reuses the buffer in batch, this is the case where reservation is not needed. However, the sort expression can be a complex expression such as l_linenumber + 1, the result of evaluation takes additional space in this case. Always reserving array is a more conservative approach that prevents allocations from overshooting the limit.

Comment on lines -372 to +359
.with_reservation(self.reservation.new_empty())
.with_reservation(self.merge_reservation.new_empty())
Copy link
Member Author

@Kontinuation Kontinuation Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should use merge_reservation here, because the allocations happening in the stream built here are for merging.

@Kontinuation Kontinuation marked this pull request as ready for review February 13, 2025 13:27
Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Kontinuation this looks incredibly helpful
cc @tustvold @Dandandan @crepererum

@comphead
Copy link
Contributor

@andygrove cc as this ticket directly related on Comet working on cloud instances

// write sorted batches to disk when the memory is insufficient.
let mut spill_writer: Option<IPCWriter> = None;
// Leave at least 1/3 of spill reservation for sort/merge the next batch. Here the
// 1/3 is simply an arbitrary chosen number.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking should we have this number configurable as datafusion parameter? feeling depending on data this number can fluctuate? It might be different for short and plain data vs deeply nested wide rows?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that SortPreservingMergeStream will never reserve memory after producing the first merged batch when merging in-memory batches, so there's no need to reserve additional space for merging in this place. I have removed the additional memory reservation in a new commit.

@2010YOUY01
Copy link
Contributor

I had a hard time making DataFusion Comet work on cloud instances with 4GB memory per CPU core, partially because DataFusion is very likely to allocates more memory than reserved and run into OOM, or run into various kinds of memory reservation failures.

Yes, this feature is quite bug-prone, perhaps we should mark it as experimental to prevent someone to use it in production. Thank you so much for the efforts.

Here are my thoughts on the changes

  1. Don't try_collect the result of merging all at once. We consume the merged batches one after another and reserve memory for each batch. Once the reservation fails we switch to "spill mode" and write all future batches into the spill file. This resolves the 2X memory allocation problem ("the second problem") reported by Memory account not adding up in SortExec #10073, as well as this comment: External sorting not working for (maybe only for string columns??) #12136 (comment)
  1. Reserves more memory for ingested batches to leave some room for merging. This PR reserves 2X memory for each batch, this works for most of the queries in sort-tpch benchmark (all except Q5 and Q8). User still have to configure sort_spill_reservation_bytes when memory reserved is not big enough for merging. I don't think it is a good change but this is the only solution I can think of to compensate for the extra memory usage for the row representation of sorted columns.

The 2X memory problem is: specifying a query to run under 100M memory, the measured physical memory is 200M. (though the reality is even worse than 2X 🤦🏼 , see #14142)
This is caused by, when the first time OOM happens:

  • there are already 1X batches in memory
  • Then it will be sorted and merged at once, meaning original batches should hold until all sorted runs are generated (now memory footprint is already 2X)
  • In the merging phase, additional col->row conversion consumes some extra memory (2X+)

I can get the high-level idea of the more conservative memory accounting in point 3, it is used to also account for merged batches, but I get lost in the memory estimation details in the implementation (especially is this 2X estimation amplification only for merged batches, or also intermediate Rows), could you explain with a concrete example how everything is calculated? (for example, there is a ExternalSorter with 100M memory budget, and it will consume 10 batches, each with 20M size, how memory estimation is calculated in each step)

  1. shrink_to_fit every sorted batches reduce the memory footprint of sorted batches, otherwise sorted string arrays may take 2X the original space in the worst case, due to exponential growth of MutableBuffer for storing variable length binary values. shrink_to_fit is a no-op for primitive-type columns returned by take_arrays since they already have the right capacity, and benchmarking showed no significant performance regression for non-primitive types such as string arrays, so I think it is a good change. This resolves "the first problem" reported by Memory account not adding up in SortExec #10073.

I think the buffer resizing mechanism is not doubling each time, the default policy will allocate new constant size buffers https://docs.rs/arrow-array/54.1.0/src/arrow_array/builder/generic_bytes_view_builder.rs.html#120-122, so this change might not help

@Kontinuation
Copy link
Member Author

  1. shrink_to_fit every sorted batches reduce the memory footprint of sorted batches, otherwise sorted string arrays may take 2X the original space in the worst case, due to exponential growth of MutableBuffer for storing variable length binary values. shrink_to_fit is a no-op for primitive-type columns returned by take_arrays since they already have the right capacity, and benchmarking showed no significant performance regression for non-primitive types such as string arrays, so I think it is a good change. This resolves "the first problem" reported by Memory account not adding up in SortExec #10073.

I think the buffer resizing mechanism is not doubling each time, the default policy will allocate new constant size buffers https://docs.rs/arrow-array/54.1.0/src/arrow_array/builder/generic_bytes_view_builder.rs.html#120-122, so this change might not help

Actually it helps. I have added a new test case test_sort_spill_utf8_strings. It will fail after removing the shrink_to_fit calls.

Here is where the 2X buffer growth come from:

  1. sort_batch calls take_arrays, which calls take_bytes for string columns
  2. take_bytes allocates a MutableBuffer for storing strings taken from the input array
  3. take_bytes calls the extend_from_slice method of the values mutable buffer to append strings to the buffer, which in turn calls reserve to grow its space
  4. reserve grows the size exponentially by a factor of 2

@Kontinuation
Copy link
Member Author

I can get the high-level idea of the more conservative memory accounting in point 3, it is used to also account for merged batches, but I get lost in the memory estimation details in the implementation (especially is this 2X estimation amplification only for merged batches, or also intermediate Rows), could you explain with a concrete example how everything is calculated? (for example, there is a ExternalSorter with 100M memory budget, and it will consume 10 batches, each with 20M size, how memory estimation is calculated in each step)

The 2X amplification is mainly for intermediate Rows, not for merged batches.

Let's assume that each batch is 10 MB, and we have 100 MB memory budget. The following diagram shows how the memory consumption become 100MB when performing merging.

datafusion-sort-merge drawio

Here is the detailed explanation:

  1. We reserve 2X memory for each batch on insertion, so when in_mem_batches holds 5 batches and consumes 50 MB memory, we have already reserved 100 MB memory. The next insertion will trigger a merge, and possibly a spill.
  2. When merge happens, each batch in in_mem_batches was sorted individually using sort_batch and fed into StreamingMergeBuilder to build a SortPreservingMergeStream. The batches were taken away from in_mem_batches, and the original batches will be dropped immediately after retrieving a sorted batch. We assume that the sorted batches has the same size as the original batches.
  3. SortPreservingMergeStream polls one batch from each sorted stream, create a row representation or sorted array representation for each batch. The sorted batches were saved into in_progress and the row/sorted array representation were saved into cursors. We assume that the row representation or sorted array has the same size as the sorted batch. Now we have consumed 100MB.
  4. SortPreservingMergeStream produces merged batches. We can assume that the overall memory consumption remains unchanged during this process, and certainly we need to reserve memory for merged batches. Each time we poll a merged batche from SortPreservingMergeStream, we try reserving memory for it. If the reservation fails, all future merged batches polled from the merged stream will be directly written to the spill file.

@Kontinuation
Copy link
Member Author

BTW, the 2X amplification for intermediate Rows seems to be conservative, but it is still not enough for some of the queries. The following query creates Rows larger than the original batches, and the query will fail when reserving memory for Rows:

-- sort-tpch Q5: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + no payload column

SELECT l_linenumber, l_suppkey, l_orderkey
FROM lineitem
ORDER BY l_linenumber, l_suppkey, l_orderkey

This can be worked around by configuring a larger sort_spill_reservation_bytes.

@Kontinuation
Copy link
Member Author

I had another interesting observation: spilling sort can be faster than memory unbounded sort in datafusion.

I tried running sort-tpch Q3 using this PR with #14642 cherry-picked onto it, and configured parquet.schema_force_view_types = false to mitigate #12136 (comment). Here are the test results obtained on a cloud instance with Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz CPU:

$./target/release/dfbench sort-tpch --iterations 1 --path benchmarks/data/tpch_sf10 --memory-limit 1000M -q 3 -n1
Q3 iteration 0 took 93339.0 ms and returned 59986052 rows
Q3 avg time: 93339.00 ms
$./target/release/dfbench sort-tpch --iterations 1 --path benchmarks/data/tpch_sf10 --memory-limit 500M -q 3 -n1
Q3 iteration 0 took 81831.2 ms and returned 59986052 rows
Q3 avg time: 81831.18 ms
$./target/release/dfbench sort-tpch --iterations 1 --path benchmarks/data/tpch_sf10 --memory-limit 200M -q 3 -n1
Q3 iteration 0 took 77046.4 ms and returned 59986052 rows
Q3 avg time: 77046.36 ms
$./target/release/dfbench sort-tpch --iterations 1 --path benchmarks/data/tpch_sf10 -q 3 -n1
Q3 iteration 0 took 170416.1 ms and returned 59986052 rows
Q3 avg time: 170416.10 ms

When running without memory limit, we are merging tons of small sorted streams, this seems to be bad for performance. Memory limit enforces us to do merging before ingesting all the batches, so we are doing several smaller merges first and do a final merge at last to produce the result set. Coalescing batches into larger streams before merging seems to be a good idea.

@kazuyukitanimura
Copy link
Contributor

Not directly related to the point of this PR but regarding
I had a hard time making DataFusion Comet work on cloud instances with 4GB memory per CPU core, partially because DataFusion is very likely to allocates more memory than reserved and run into OOM, or run into various kinds of memory reservation failures

I hope this apache/datafusion-comet#1369 helps. This may cause more spilling but I expect the chance of OOM is reduced. Still accurate tracking of memory is important, otherwise mis-tracking still causes OOM

@zhuqi-lucas
Copy link
Contributor

zhuqi-lucas commented Feb 15, 2025

use arrow::array::{RecordBatch, StringBuilder};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use futures::TryStreamExt;
use std::sync::Arc;

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
pub async fn main() {
    build_parquet();

    let env = RuntimeEnvBuilder::new()
        .with_disk_manager(DiskManagerConfig::default())
        .with_memory_pool(Arc::new(FairSpillPool::new(100 * 1024 * 1024)))
        .build_arc()
        .unwrap();

    let mut config = SessionConfig::new().with_sort_spill_reservation_bytes(32 * 1024 * 1024);
    config.options_mut().execution.parquet.schema_force_view_types = false;

    let ctx = SessionContext::new_with_config_rt(config, env);

    ctx.register_parquet(
        "big_strings",
        "/tmp/big_strings.parquet",
        ParquetReadOptions::default(),
    )
        .await
        .unwrap();

    let sql = "SELECT * FROM big_strings ORDER BY strings";
    println!("Sorting strings");
    ctx.sql(sql)
        .await
        .unwrap()
        .execute_stream()
        .await
        .unwrap()
        .try_for_each(|_| std::future::ready(Ok(())))
        .await
        .unwrap();
}

fn build_parquet() {
    if std::fs::File::open("/tmp/big_strings.parquet").is_ok() {
        println!("Using existing file at /tmp/big_strings.parquet");
        return;
    }
    println!("Generating test file at /tmp/big_strings.parquet");
    let file = std::fs::File::create("/tmp/big_strings.parquet").unwrap();
    let schema = Arc::new(Schema::new(vec![Field::new(
        "strings",
        DataType::Utf8,
        false,
    )]));
    let mut writer = ArrowWriter::try_new(file, schema.clone(), None).unwrap();

    for batch_idx in 0..100 {
        println!("Generating batch {} of 100", batch_idx);
        let mut string_array_builder =
            StringBuilder::with_capacity(1024 * 1024, 1024 * 1024 * 3 * 14);
        for i in 0..(1024 * 1024) {
            string_array_builder
                .append_value(format!("string-{}string-{}string-{}", i, i, i));
        }
        let array = Arc::new(string_array_builder.finish());
        let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
        writer.write(&batch).unwrap();
    }
    writer.close().unwrap();
}
called `Result::unwrap()` on an `Err` value: ResourcesExhausted("Failed to allocate additional 353536 bytes for ExternalSorterMerge[1] with 22948928 bytes already allocated for this reservation - 127190 bytes remain available for the total pool")

Thank you @kazuyukitanimura for the PR, i applied the PR try to fix the testing, but the above testing is still failed for me, i am not sure if i am missing something.

@Kontinuation
Copy link
Member Author

Thank you @kazuyukitanimura for the PR, i applied the PR try to fix the testing, but the above testing is still failed for me, i am not sure if i am missing something.

There are 2 problems:

  1. The DataSourceExec may have many partitions, and each SortExec on that partition will only get a fair share of the 100MB pool, so each partition won't get enough memory to operate. This is still the case even with worker_threads = 1. If you still want to sort 4.2GB parquet file using 100MBs of memory, you can set .with_target_partitions(1) in your session config.
  2. 100MB is not enough for the final merging with spill-reads. There will be roughly 200 spill files generated after ingesting all the batches, the size of a typical batch for this workload is 352380 bytes. The memory needed for merging will be 200 * (352380 bytes) * 2 > 100MB. Merging phase is unspillable so it requires a minimum amount of memory to operate. Raising the memory limit to 200MB will work for this particular workload.

One possible fix for problem 2 is to use a smaller batch size when writing batches to spill files, so that the unspillable memory required for the final spill-read merging will be smaller. Or we simply leave this problem as is and requires the user to raise the memory limit.

@zhuqi-lucas
Copy link
Contributor

Thank you @kazuyukitanimura for the PR, i applied the PR try to fix the testing, but the above testing is still failed for me, i am not sure if i am missing something.

There are 2 problems:

  1. The DataSourceExec may have many partitions, and each SortExec on that partition will only get a fair share of the 100MB pool, so each partition won't get enough memory to operate. This is still the case even with worker_threads = 1. If you still want to sort 4.2GB parquet file using 100MBs of memory, you can set .with_target_partitions(1) in your session config.
  2. 100MB is not enough for the final merging with spill-reads. There will be roughly 200 spill files generated after ingesting all the batches, the size of a typical batch for this workload is 352380 bytes. The memory needed for merging will be 200 * (352380 bytes) * 2 > 100MB. Merging phase is unspillable so it requires a minimum amount of memory to operate. Raising the memory limit to 200MB will work for this particular workload.

One possible fix for problem 2 is to use a smaller batch size when writing batches to spill files, so that the unspillable memory required for the final spill-read merging will be smaller. Or we simply leave this problem as is and requires the user to raise the memory limit.

Thank you @Kontinuation for good explain, it makes sense to me, i will try it.

And for the problem 2, is it possible we introduce a spillable merging phase, will it be more safe?

self.sort_or_spill_in_mem_batches().await?;
// We've already freed more than half of reserved memory,
// so we can grow the reservation again. There's nothing we can do
// if this try_grow fails.
Copy link
Contributor

@zhuqi-lucas zhuqi-lucas Feb 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reserves more memory for ingested batches to leave some room for merging. This PR reserves 2X memory for each batch, this works for most of the queries in sort-tpch benchmark (all except Q5 and Q8). User still have to configure sort_spill_reservation_bytes when memory reserved is not big enough for merging. I don't think it is a good change but this is the only solution I can think of to compensate for the extra memory usage for the row representation of sorted columns.

Is it possible to make the merging phase also spillable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can implement a more complicated multi-stage merging phase which merges a small portion of streams each time, and perform multiple rounds of merges to produce the final merged stream. However this approach is quite complex and out of the scope of this PR. I believe that there should be dedicated discussions around this problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we can add more dedicated discussions around this problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can implement a more complicated multi-stage merging phase which merges a small portion of streams each time, and perform multiple rounds of merges to produce the final merged stream. However this approach is quite complex and out of the scope of this PR. I believe that there should be dedicated discussions around this problem.

I agree we should implement such a feature as a dedicated, follow on PR / Project.

@zhuqi-lucas or @Kontinuation is there one of you can file a ticket to track the work. I think especially highlighting what cases the current code won't work well is important.

It sounds like there are several issues:

  1. ensuring we can always spill data (now spilling will sometimes fail if we run out of memory to sort the batches in)
  2. Ensuring that we can always merge the data that was spilled, even if it had a really wide fanout (like 1000 of spill files)

I think problem 1 could be solved by potentially spilling unsorted batches (and then sorting them separately). This woudl be less efficient (read/write some tuples twice but would work.

Problem 2 could use the multi-pass merge that @Kontinuation describes

@zhuqi-lucas
Copy link
Contributor

Thank you @kazuyukitanimura for the PR, i applied the PR try to fix the testing, but the above testing is still failed for me, i am not sure if i am missing something.

There are 2 problems:

  1. The DataSourceExec may have many partitions, and each SortExec on that partition will only get a fair share of the 100MB pool, so each partition won't get enough memory to operate. This is still the case even with worker_threads = 1. If you still want to sort 4.2GB parquet file using 100MBs of memory, you can set .with_target_partitions(1) in your session config.
  2. 100MB is not enough for the final merging with spill-reads. There will be roughly 200 spill files generated after ingesting all the batches, the size of a typical batch for this workload is 352380 bytes. The memory needed for merging will be 200 * (352380 bytes) * 2 > 100MB. Merging phase is unspillable so it requires a minimum amount of memory to operate. Raising the memory limit to 200MB will work for this particular workload.

One possible fix for problem 2 is to use a smaller batch size when writing batches to spill files, so that the unspillable memory required for the final spill-read merging will be smaller. Or we simply leave this problem as is and requires the user to raise the memory limit.

Updated, it works after change to 1 partition and increase the memory limit.

// data types due to exponential growth when building the sort columns. We shrink the columns
// to prevent memory reservation failures, as well as excessive memory allocation when running
// merges in `SortPreservingMergeStream`.
columns.iter_mut().for_each(|c| {
Copy link
Contributor

@zhuqi-lucas zhuqi-lucas Feb 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be expensive for all columns and each batch to do this, or can we filter those accurate columns which need to shrink?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems you already benchmarked it, we may can the benchmark code also.

Copy link
Member Author

@Kontinuation Kontinuation Feb 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shrink_to_fit is basically a no-op for primitive type columns produced by take_arrays because their internal buffer already has the right capacity, and it will only perform reallocations for columns with variable length data. so I don't there's a need for cherry-picking which columns to shrink.

I've also benchmarked sorting using utf8 columns and have not observed significant performance overhead:

merge sorted utf8 low cardinality
                        time:   [3.8824 ms 3.8903 ms 3.8987 ms]
                        change: [-2.8027% -2.1227% -1.5573%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
  3 (3.00%) high mild
  1 (1.00%) high severe

sort merge utf8 low cardinality
                        time:   [4.2295 ms 4.2360 ms 4.2430 ms]
                        change: [+0.5975% +0.8722% +1.1242%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 15 outliers among 100 measurements (15.00%)
  2 (2.00%) low mild
  5 (5.00%) high mild
  8 (8.00%) high severe

sort utf8 low cardinality
                        time:   [6.4265 ms 6.4369 ms 6.4483 ms]
                        change: [-0.3276% -0.0658% +0.1908%] (p = 0.62 > 0.05)
                        No change in performance detected.
Found 7 outliers among 100 measurements (7.00%)
  5 (5.00%) high mild
  2 (2.00%) high severe

sort partitioned utf8 low cardinality
                        time:   [343.87 µs 347.16 µs 351.07 µs]
                        change: [-1.1291% +0.3066% +1.8160%] (p = 0.68 > 0.05)
                        No change in performance detected.
Found 15 outliers among 100 measurements (15.00%)
  5 (5.00%) high mild
  10 (10.00%) high severe

merge sorted utf8 high cardinality
                        time:   [5.9968 ms 6.0083 ms 6.0207 ms]
                        change: [-1.9398% -1.6215% -1.2928%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  7 (7.00%) high mild
  1 (1.00%) high severe

sort merge utf8 high cardinality
                        time:   [6.4266 ms 6.4399 ms 6.4558 ms]
                        change: [-0.5594% -0.2292% +0.1020%] (p = 0.19 > 0.05)
                        No change in performance detected.
Found 6 outliers among 100 measurements (6.00%)
  2 (2.00%) high mild
  4 (4.00%) high severe

sort utf8 high cardinality
                        time:   [7.7403 ms 7.7541 ms 7.7693 ms]
                        change: [-2.7779% -2.1541% -1.6176%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
  2 (2.00%) high mild
  4 (4.00%) high severe

sort partitioned utf8 high cardinality
                        time:   [364.21 µs 370.21 µs 376.41 µs]
                        change: [+2.2461% +4.2833% +6.3333%] (p = 0.00 < 0.05)
                        Performance has regressed.

sort, sort_tpch and tpch10 benchmarks also showed not much difference in performance.

Comparing main and fix-sort-mem-usage-reserve-mem-for-sort-merging
--------------------
Benchmark sort.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃       main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Qsort utf8   │ 28514.66ms │                                      28451.16ms │ no change │
│ Qsort int    │ 29749.57ms │                                      29879.78ms │ no change │
│ Qsort        │ 28608.24ms │                                      29085.31ms │ no change │
│ decimal      │            │                                                 │           │
│ Qsort        │ 31013.98ms │                                      31126.24ms │ no change │
│ integer      │            │                                                 │           │
│ tuple        │            │                                                 │           │
│ Qsort utf8   │ 28925.23ms │                                      29281.38ms │ no change │
│ tuple        │            │                                                 │           │
│ Qsort mixed  │ 30579.63ms │                                      30550.25ms │ no change │
│ tuple        │            │                                                 │           │
└──────────────┴────────────┴─────────────────────────────────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary                                              ┃             ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (main)                                              │ 177391.31ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging)   │ 178374.12ms │
│ Average Time (main)                                            │  29565.22ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │  29729.02ms │
│ Queries Faster                                                 │           0 │
│ Queries Slower                                                 │           0 │
│ Queries with No Change                                         │           6 │
└────────────────────────────────────────────────────────────────┴─────────────┘
--------------------
Benchmark sort_tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃     main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Q1           │ 187.27ms │                                        187.75ms │ no change │
│ Q2           │ 154.92ms │                                        157.82ms │ no change │
│ Q3           │ 885.18ms │                                        893.74ms │ no change │
│ Q4           │ 184.50ms │                                        189.54ms │ no change │
│ Q5           │ 315.13ms │                                        322.19ms │ no change │
│ Q6           │ 335.00ms │                                        338.65ms │ no change │
│ Q7           │ 584.88ms │                                        594.44ms │ no change │
│ Q8           │ 452.66ms │                                        460.51ms │ no change │
│ Q9           │ 472.15ms │                                        475.38ms │ no change │
│ Q10          │ 681.58ms │                                        685.07ms │ no change │
└──────────────┴──────────┴─────────────────────────────────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                                              ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main)                                              │ 4253.28ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging)   │ 4305.10ms │
│ Average Time (main)                                            │  425.33ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │  430.51ms │
│ Queries Faster                                                 │         0 │
│ Queries Slower                                                 │         0 │
│ Queries with No Change                                         │        10 │
└────────────────────────────────────────────────────────────────┴───────────┘
--------------------
Benchmark sort_tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1           │  2617.74ms │                                       2652.65ms │     no change │
│ Q2           │  2019.64ms │                                       2034.08ms │     no change │
│ Q3           │ 10748.55ms │                                      11028.78ms │     no change │
│ Q4           │  2565.69ms │                                       2581.39ms │     no change │
│ Q5           │  3182.88ms │                                       3226.93ms │     no change │
│ Q6           │  3379.76ms │                                       3432.35ms │     no change │
│ Q7           │  7200.46ms │                                       7245.30ms │     no change │
│ Q8           │  4932.09ms │                                       5133.81ms │     no change │
│ Q9           │  5488.64ms │                                       5473.89ms │     no change │
│ Q10          │ 18188.22ms │                                      17129.05ms │ +1.06x faster │
└──────────────┴────────────┴─────────────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                              ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                              │ 60323.67ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging)   │ 59938.23ms │
│ Average Time (main)                                            │  6032.37ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │  5993.82ms │
│ Queries Faster                                                 │          1 │
│ Queries Slower                                                 │          0 │
│ Queries with No Change                                         │          9 │
└────────────────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │ 1003.40ms │                                        974.06ms │    no change │
│ QQuery 2     │  142.90ms │                                        142.05ms │    no change │
│ QQuery 3     │  437.35ms │                                        429.21ms │    no change │
│ QQuery 4     │  218.20ms │                                        219.31ms │    no change │
│ QQuery 5     │  638.99ms │                                        633.81ms │    no change │
│ QQuery 6     │  152.49ms │                                        151.94ms │    no change │
│ QQuery 7     │  937.33ms │                                        952.74ms │    no change │
│ QQuery 8     │  690.88ms │                                        675.75ms │    no change │
│ QQuery 9     │ 1055.28ms │                                       1039.38ms │    no change │
│ QQuery 10    │  621.41ms │                                        632.68ms │    no change │
│ QQuery 11    │   93.62ms │                                        100.54ms │ 1.07x slower │
│ QQuery 12    │  321.36ms │                                        329.27ms │    no change │
│ QQuery 13    │  442.88ms │                                        434.09ms │    no change │
│ QQuery 14    │  252.07ms │                                        252.79ms │    no change │
│ QQuery 15    │  419.63ms │                                        414.17ms │    no change │
│ QQuery 16    │  106.30ms │                                        107.51ms │    no change │
│ QQuery 17    │ 1088.73ms │                                       1083.62ms │    no change │
│ QQuery 18    │ 1795.68ms │                                       1785.46ms │    no change │
│ QQuery 19    │  462.31ms │                                        458.10ms │    no change │
│ QQuery 20    │  403.54ms │                                        428.10ms │ 1.06x slower │
│ QQuery 21    │ 1453.76ms │                                       1454.77ms │    no change │
│ QQuery 22    │  158.43ms │                                        151.23ms │    no change │
└──────────────┴───────────┴─────────────────────────────────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                              ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                              │ 12896.55ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging)   │ 12850.56ms │
│ Average Time (main)                                            │   586.21ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │   584.12ms │
│ Queries Faster                                                 │          0 │
│ Queries Slower                                                 │          2 │
│ Queries with No Change                                         │         20 │
└────────────────────────────────────────────────────────────────┴────────────┘

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense.

// We'll gradually collect the sorted stream into self.in_mem_batches, or directly
// write sorted batches to disk when the memory is insufficient.
let mut spill_writer: Option<IPCWriter> = None;
while let Some(batch) = sorted_stream.next().await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point to avoid OOM for try_collect().

@2010YOUY01
Copy link
Contributor

Here is the detailed explanation:

  1. We reserve 2X memory for each batch on insertion, so when in_mem_batches holds 5 batches and consumes 50 MB memory, we have already reserved 100 MB memory. The next insertion will trigger a merge, and possibly a spill.
  2. When merge happens, each batch in in_mem_batches was sorted individually using sort_batch and fed into StreamingMergeBuilder to build a SortPreservingMergeStream. The batches were taken away from in_mem_batches, and the original batches will be dropped immediately after retrieving a sorted batch. We assume that the sorted batches has the same size as the original batches.
  3. SortPreservingMergeStream polls one batch from each sorted stream, create a row representation or sorted array representation for each batch. The sorted batches were saved into in_progress and the row/sorted array representation were saved into cursors. We assume that the row representation or sorted array has the same size as the sorted batch. Now we have consumed 100MB.
  4. SortPreservingMergeStream produces merged batches. We can assume that the overall memory consumption remains unchanged during this process, and certainly we need to reserve memory for merged batches. Each time we poll a merged batche from SortPreservingMergeStream, we try reserving memory for it. If the reservation fails, all future merged batches polled from the merged stream will be directly written to the spill file.

Thanks for the nice diagram, this explanation is super clear

Regarding point 3, I thought of a edge case can cause Rows way larger than 1X:
During merging, now only order key will be converted to row format.

  • If select c1,c2 from t order by c1, c2, all columns will be convereted to Row, the estimation is close to 1X
  • If select c1, ... c10 from t order by c1, only 1 column will be converted, the extra Row overhead is 0.1X. (I think this is okay to estimate more to be conservative)

Edge case: let's say input is a deduplicated StringViewArray (like a 10k rows batch with only 100 distinct values, but payload content are stored without duplication, the array elements are just referencing to the payload range), after converting to Row format, every row will be materialized, then the Row format will have 100X expansion
I think we need some mechanism to deal with this kind of edge case, perhaps this also applies to dictionary representation

For point 4, are the memory budget to hold merged batches come from sort_spill_reservation_bytes? Small sorted runs, and converted rows should have taken up all memory spaces at this stage.

@Kontinuation
Copy link
Member Author

Kontinuation commented Feb 15, 2025

Edge case: let's say input is a deduplicated StringViewArray (like a 10k rows batch with only 100 distinct values, but payload content are stored without duplication, the array elements are just referencing to the payload range), after converting to Row format, every row will be materialized, then the Row format will have 100X expansion I think we need some mechanism to deal with this kind of edge case, perhaps this also applies to dictionary representation

I agree that the current implementation uses a very rough estimation, and it could be way off from the actual memory consumption.

A better approach is to sort and generate the row representation of the batch right after we ingesting it, then we would know the exact size of sorted batches and their row representations held in memory. The merge phase for handling spilling could simply take away these data and perform merging without reserving more memory. However, this conflicts between some of the optimizations we did in the past:

For point 4, are the memory budget to hold merged batches come from sort_spill_reservation_bytes? Small sorted runs, and converted rows should have taken up all memory spaces at this stage.

Yes. It may come from sort_spill_reservation_bytes, or the reduced memory usage after per-batch sorting because of the fetch option.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, thank you so much @Kontinuation and @zhuqi-lucas - I think this PR is very nicely coded, and commented and is quite clear to read.

I think the only other thing left in my mind is to add a few more fuzz tests, but otherwise this is nice

@2010YOUY01 made a nice set of tests too to verify that the memory accounting was accurate, but they are currently disabled. Maybe we can also run them here too 🤔

cc @kazuyukitanimura
cc @westonpace as you filed #10073

self.sort_or_spill_in_mem_batches().await?;
// We've already freed more than half of reserved memory,
// so we can grow the reservation again. There's nothing we can do
// if this try_grow fails.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can implement a more complicated multi-stage merging phase which merges a small portion of streams each time, and perform multiple rounds of merges to produce the final merged stream. However this approach is quite complex and out of the scope of this PR. I believe that there should be dedicated discussions around this problem.

I agree we should implement such a feature as a dedicated, follow on PR / Project.

@zhuqi-lucas or @Kontinuation is there one of you can file a ticket to track the work. I think especially highlighting what cases the current code won't work well is important.

It sounds like there are several issues:

  1. ensuring we can always spill data (now spilling will sometimes fail if we run out of memory to sort the batches in)
  2. Ensuring that we can always merge the data that was spilled, even if it had a really wide fanout (like 1000 of spill files)

I think problem 1 could be solved by potentially spilling unsorted batches (and then sorting them separately). This woudl be less efficient (read/write some tuples twice but would work.

Problem 2 could use the multi-pass merge that @Kontinuation describes

@@ -54,7 +54,9 @@ async fn test_sort_10k_mem() {
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_100k_mem() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please also add some fuzz tests for the cases you are coverting in this PR? Specifically:

  1. RecordBatches with String arrays
  2. RecordBatches that have multiple columns (and thus use the Row format)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Memory account not adding up in SortExec
6 participants