-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bug: Fix memory reservation and allocation problems for SortExec #14644
base: main
Are you sure you want to change the base?
bug: Fix memory reservation and allocation problems for SortExec #14644
Conversation
Fix batch memory consumption growth after sorting; Reserve memory more aggressively to compensate for memory needed for merging.
2f7f403
to
8cc9aea
Compare
let value = self.sort.expr.evaluate(batch)?; | ||
let array = value.into_array(batch.num_rows())?; | ||
let size_in_mem = array.get_buffer_memory_size(); | ||
let array = array.as_any().downcast_ref::<T>().expect("field values"); | ||
Ok(ArrayValues::new(self.sort.options, array)) | ||
let mut array_reservation = self.reservation.new_empty(); | ||
array_reservation.try_grow(size_in_mem)?; | ||
Ok(ArrayValues::new( | ||
self.sort.options, | ||
array, | ||
array_reservation, | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this reservation is needed.
When the sort expression is a simple column reference, the array
simply reuses the buffer in batch
, this is the case where reservation is not needed. However, the sort expression can be a complex expression such as l_linenumber + 1
, the result of evaluation takes additional space in this case. Always reserving array
is a more conservative approach that prevents allocations from overshooting the limit.
.with_reservation(self.reservation.new_empty()) | ||
.with_reservation(self.merge_reservation.new_empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we should use merge_reservation
here, because the allocations happening in the stream built here are for merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Kontinuation this looks incredibly helpful
cc @tustvold @Dandandan @crepererum
@andygrove cc as this ticket directly related on Comet working on cloud instances |
// write sorted batches to disk when the memory is insufficient. | ||
let mut spill_writer: Option<IPCWriter> = None; | ||
// Leave at least 1/3 of spill reservation for sort/merge the next batch. Here the | ||
// 1/3 is simply an arbitrary chosen number. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking should we have this number configurable as datafusion parameter? feeling depending on data this number can fluctuate? It might be different for short and plain data vs deeply nested wide rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that SortPreservingMergeStream
will never reserve memory after producing the first merged batch when merging in-memory batches, so there's no need to reserve additional space for merging in this place. I have removed the additional memory reservation in a new commit.
Yes, this feature is quite bug-prone, perhaps we should mark it as experimental to prevent someone to use it in production. Thank you so much for the efforts. Here are my thoughts on the changes
The 2X memory problem is: specifying a query to run under 100M memory, the measured physical memory is 200M. (though the reality is even worse than 2X 🤦🏼 , see #14142)
I can get the high-level idea of the more conservative memory accounting in point 3, it is used to also account for merged batches, but I get lost in the memory estimation details in the implementation (especially is this 2X estimation amplification only for merged batches, or also intermediate
I think the buffer resizing mechanism is not doubling each time, the default policy will allocate new constant size buffers https://docs.rs/arrow-array/54.1.0/src/arrow_array/builder/generic_bytes_view_builder.rs.html#120-122, so this change might not help |
Actually it helps. I have added a new test case Here is where the 2X buffer growth come from:
|
The 2X amplification is mainly for intermediate Let's assume that each batch is 10 MB, and we have 100 MB memory budget. The following diagram shows how the memory consumption become 100MB when performing merging. Here is the detailed explanation:
|
BTW, the 2X amplification for intermediate
This can be worked around by configuring a larger |
I had another interesting observation: spilling sort can be faster than memory unbounded sort in datafusion. I tried running sort-tpch Q3 using this PR with #14642 cherry-picked onto it, and configured
When running without memory limit, we are merging tons of small sorted streams, this seems to be bad for performance. Memory limit enforces us to do merging before ingesting all the batches, so we are doing several smaller merges first and do a final merge at last to produce the result set. Coalescing batches into larger streams before merging seems to be a good idea. |
Not directly related to the point of this PR but regarding I hope this apache/datafusion-comet#1369 helps. This may cause more spilling but I expect the chance of OOM is reduced. Still accurate tracking of memory is important, otherwise mis-tracking still causes OOM |
use arrow::array::{RecordBatch, StringBuilder};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use futures::TryStreamExt;
use std::sync::Arc;
#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
pub async fn main() {
build_parquet();
let env = RuntimeEnvBuilder::new()
.with_disk_manager(DiskManagerConfig::default())
.with_memory_pool(Arc::new(FairSpillPool::new(100 * 1024 * 1024)))
.build_arc()
.unwrap();
let mut config = SessionConfig::new().with_sort_spill_reservation_bytes(32 * 1024 * 1024);
config.options_mut().execution.parquet.schema_force_view_types = false;
let ctx = SessionContext::new_with_config_rt(config, env);
ctx.register_parquet(
"big_strings",
"/tmp/big_strings.parquet",
ParquetReadOptions::default(),
)
.await
.unwrap();
let sql = "SELECT * FROM big_strings ORDER BY strings";
println!("Sorting strings");
ctx.sql(sql)
.await
.unwrap()
.execute_stream()
.await
.unwrap()
.try_for_each(|_| std::future::ready(Ok(())))
.await
.unwrap();
}
fn build_parquet() {
if std::fs::File::open("/tmp/big_strings.parquet").is_ok() {
println!("Using existing file at /tmp/big_strings.parquet");
return;
}
println!("Generating test file at /tmp/big_strings.parquet");
let file = std::fs::File::create("/tmp/big_strings.parquet").unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"strings",
DataType::Utf8,
false,
)]));
let mut writer = ArrowWriter::try_new(file, schema.clone(), None).unwrap();
for batch_idx in 0..100 {
println!("Generating batch {} of 100", batch_idx);
let mut string_array_builder =
StringBuilder::with_capacity(1024 * 1024, 1024 * 1024 * 3 * 14);
for i in 0..(1024 * 1024) {
string_array_builder
.append_value(format!("string-{}string-{}string-{}", i, i, i));
}
let array = Arc::new(string_array_builder.finish());
let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();
} called `Result::unwrap()` on an `Err` value: ResourcesExhausted("Failed to allocate additional 353536 bytes for ExternalSorterMerge[1] with 22948928 bytes already allocated for this reservation - 127190 bytes remain available for the total pool") Thank you @kazuyukitanimura for the PR, i applied the PR try to fix the testing, but the above testing is still failed for me, i am not sure if i am missing something. |
There are 2 problems:
One possible fix for problem 2 is to use a smaller batch size when writing batches to spill files, so that the unspillable memory required for the final spill-read merging will be smaller. Or we simply leave this problem as is and requires the user to raise the memory limit. |
Thank you @Kontinuation for good explain, it makes sense to me, i will try it. And for the problem 2, is it possible we introduce a spillable merging phase, will it be more safe? |
self.sort_or_spill_in_mem_batches().await?; | ||
// We've already freed more than half of reserved memory, | ||
// so we can grow the reservation again. There's nothing we can do | ||
// if this try_grow fails. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reserves more memory for ingested batches to leave some room for merging. This PR reserves 2X memory for each batch, this works for most of the queries in sort-tpch benchmark (all except Q5 and Q8). User still have to configure sort_spill_reservation_bytes when memory reserved is not big enough for merging. I don't think it is a good change but this is the only solution I can think of to compensate for the extra memory usage for the row representation of sorted columns.
Is it possible to make the merging phase also spillable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can implement a more complicated multi-stage merging phase which merges a small portion of streams each time, and perform multiple rounds of merges to produce the final merged stream. However this approach is quite complex and out of the scope of this PR. I believe that there should be dedicated discussions around this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, we can add more dedicated discussions around this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can implement a more complicated multi-stage merging phase which merges a small portion of streams each time, and perform multiple rounds of merges to produce the final merged stream. However this approach is quite complex and out of the scope of this PR. I believe that there should be dedicated discussions around this problem.
I agree we should implement such a feature as a dedicated, follow on PR / Project.
@zhuqi-lucas or @Kontinuation is there one of you can file a ticket to track the work. I think especially highlighting what cases the current code won't work well is important.
It sounds like there are several issues:
- ensuring we can always spill data (now spilling will sometimes fail if we run out of memory to sort the batches in)
- Ensuring that we can always merge the data that was spilled, even if it had a really wide fanout (like 1000 of spill files)
I think problem 1 could be solved by potentially spilling unsorted batches (and then sorting them separately). This woudl be less efficient (read/write some tuples twice but would work.
Problem 2 could use the multi-pass merge that @Kontinuation describes
Updated, it works after change to 1 partition and increase the memory limit. |
// data types due to exponential growth when building the sort columns. We shrink the columns | ||
// to prevent memory reservation failures, as well as excessive memory allocation when running | ||
// merges in `SortPreservingMergeStream`. | ||
columns.iter_mut().for_each(|c| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it be expensive for all columns and each batch to do this, or can we filter those accurate columns which need to shrink?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems you already benchmarked it, we may can the benchmark code also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shrink_to_fit
is basically a no-op for primitive type columns produced by take_arrays
because their internal buffer already has the right capacity, and it will only perform reallocations for columns with variable length data. so I don't there's a need for cherry-picking which columns to shrink.
I've also benchmarked sorting using utf8 columns and have not observed significant performance overhead:
merge sorted utf8 low cardinality
time: [3.8824 ms 3.8903 ms 3.8987 ms]
change: [-2.8027% -2.1227% -1.5573%] (p = 0.00 < 0.05)
Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
3 (3.00%) high mild
1 (1.00%) high severe
sort merge utf8 low cardinality
time: [4.2295 ms 4.2360 ms 4.2430 ms]
change: [+0.5975% +0.8722% +1.1242%] (p = 0.00 < 0.05)
Change within noise threshold.
Found 15 outliers among 100 measurements (15.00%)
2 (2.00%) low mild
5 (5.00%) high mild
8 (8.00%) high severe
sort utf8 low cardinality
time: [6.4265 ms 6.4369 ms 6.4483 ms]
change: [-0.3276% -0.0658% +0.1908%] (p = 0.62 > 0.05)
No change in performance detected.
Found 7 outliers among 100 measurements (7.00%)
5 (5.00%) high mild
2 (2.00%) high severe
sort partitioned utf8 low cardinality
time: [343.87 µs 347.16 µs 351.07 µs]
change: [-1.1291% +0.3066% +1.8160%] (p = 0.68 > 0.05)
No change in performance detected.
Found 15 outliers among 100 measurements (15.00%)
5 (5.00%) high mild
10 (10.00%) high severe
merge sorted utf8 high cardinality
time: [5.9968 ms 6.0083 ms 6.0207 ms]
change: [-1.9398% -1.6215% -1.2928%] (p = 0.00 < 0.05)
Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
7 (7.00%) high mild
1 (1.00%) high severe
sort merge utf8 high cardinality
time: [6.4266 ms 6.4399 ms 6.4558 ms]
change: [-0.5594% -0.2292% +0.1020%] (p = 0.19 > 0.05)
No change in performance detected.
Found 6 outliers among 100 measurements (6.00%)
2 (2.00%) high mild
4 (4.00%) high severe
sort utf8 high cardinality
time: [7.7403 ms 7.7541 ms 7.7693 ms]
change: [-2.7779% -2.1541% -1.6176%] (p = 0.00 < 0.05)
Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
2 (2.00%) high mild
4 (4.00%) high severe
sort partitioned utf8 high cardinality
time: [364.21 µs 370.21 µs 376.41 µs]
change: [+2.2461% +4.2833% +6.3333%] (p = 0.00 < 0.05)
Performance has regressed.
sort, sort_tpch and tpch10 benchmarks also showed not much difference in performance.
Comparing main and fix-sort-mem-usage-reserve-mem-for-sort-merging
--------------------
Benchmark sort.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query ┃ main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Qsort utf8 │ 28514.66ms │ 28451.16ms │ no change │
│ Qsort int │ 29749.57ms │ 29879.78ms │ no change │
│ Qsort │ 28608.24ms │ 29085.31ms │ no change │
│ decimal │ │ │ │
│ Qsort │ 31013.98ms │ 31126.24ms │ no change │
│ integer │ │ │ │
│ tuple │ │ │ │
│ Qsort utf8 │ 28925.23ms │ 29281.38ms │ no change │
│ tuple │ │ │ │
│ Qsort mixed │ 30579.63ms │ 30550.25ms │ no change │
│ tuple │ │ │ │
└──────────────┴────────────┴─────────────────────────────────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (main) │ 177391.31ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 178374.12ms │
│ Average Time (main) │ 29565.22ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 29729.02ms │
│ Queries Faster │ 0 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 6 │
└────────────────────────────────────────────────────────────────┴─────────────┘
--------------------
Benchmark sort_tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query ┃ main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Q1 │ 187.27ms │ 187.75ms │ no change │
│ Q2 │ 154.92ms │ 157.82ms │ no change │
│ Q3 │ 885.18ms │ 893.74ms │ no change │
│ Q4 │ 184.50ms │ 189.54ms │ no change │
│ Q5 │ 315.13ms │ 322.19ms │ no change │
│ Q6 │ 335.00ms │ 338.65ms │ no change │
│ Q7 │ 584.88ms │ 594.44ms │ no change │
│ Q8 │ 452.66ms │ 460.51ms │ no change │
│ Q9 │ 472.15ms │ 475.38ms │ no change │
│ Q10 │ 681.58ms │ 685.07ms │ no change │
└──────────────┴──────────┴─────────────────────────────────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main) │ 4253.28ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 4305.10ms │
│ Average Time (main) │ 425.33ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 430.51ms │
│ Queries Faster │ 0 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 10 │
└────────────────────────────────────────────────────────────────┴───────────┘
--------------------
Benchmark sort_tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1 │ 2617.74ms │ 2652.65ms │ no change │
│ Q2 │ 2019.64ms │ 2034.08ms │ no change │
│ Q3 │ 10748.55ms │ 11028.78ms │ no change │
│ Q4 │ 2565.69ms │ 2581.39ms │ no change │
│ Q5 │ 3182.88ms │ 3226.93ms │ no change │
│ Q6 │ 3379.76ms │ 3432.35ms │ no change │
│ Q7 │ 7200.46ms │ 7245.30ms │ no change │
│ Q8 │ 4932.09ms │ 5133.81ms │ no change │
│ Q9 │ 5488.64ms │ 5473.89ms │ no change │
│ Q10 │ 18188.22ms │ 17129.05ms │ +1.06x faster │
└──────────────┴────────────┴─────────────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main) │ 60323.67ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 59938.23ms │
│ Average Time (main) │ 6032.37ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 5993.82ms │
│ Queries Faster │ 1 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 9 │
└────────────────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1 │ 1003.40ms │ 974.06ms │ no change │
│ QQuery 2 │ 142.90ms │ 142.05ms │ no change │
│ QQuery 3 │ 437.35ms │ 429.21ms │ no change │
│ QQuery 4 │ 218.20ms │ 219.31ms │ no change │
│ QQuery 5 │ 638.99ms │ 633.81ms │ no change │
│ QQuery 6 │ 152.49ms │ 151.94ms │ no change │
│ QQuery 7 │ 937.33ms │ 952.74ms │ no change │
│ QQuery 8 │ 690.88ms │ 675.75ms │ no change │
│ QQuery 9 │ 1055.28ms │ 1039.38ms │ no change │
│ QQuery 10 │ 621.41ms │ 632.68ms │ no change │
│ QQuery 11 │ 93.62ms │ 100.54ms │ 1.07x slower │
│ QQuery 12 │ 321.36ms │ 329.27ms │ no change │
│ QQuery 13 │ 442.88ms │ 434.09ms │ no change │
│ QQuery 14 │ 252.07ms │ 252.79ms │ no change │
│ QQuery 15 │ 419.63ms │ 414.17ms │ no change │
│ QQuery 16 │ 106.30ms │ 107.51ms │ no change │
│ QQuery 17 │ 1088.73ms │ 1083.62ms │ no change │
│ QQuery 18 │ 1795.68ms │ 1785.46ms │ no change │
│ QQuery 19 │ 462.31ms │ 458.10ms │ no change │
│ QQuery 20 │ 403.54ms │ 428.10ms │ 1.06x slower │
│ QQuery 21 │ 1453.76ms │ 1454.77ms │ no change │
│ QQuery 22 │ 158.43ms │ 151.23ms │ no change │
└──────────────┴───────────┴─────────────────────────────────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main) │ 12896.55ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 12850.56ms │
│ Average Time (main) │ 586.21ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 584.12ms │
│ Queries Faster │ 0 │
│ Queries Slower │ 2 │
│ Queries with No Change │ 20 │
└────────────────────────────────────────────────────────────────┴────────────┘
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense.
// We'll gradually collect the sorted stream into self.in_mem_batches, or directly | ||
// write sorted batches to disk when the memory is insufficient. | ||
let mut spill_writer: Option<IPCWriter> = None; | ||
while let Some(batch) = sorted_stream.next().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point to avoid OOM for try_collect().
Thanks for the nice diagram, this explanation is super clear Regarding point 3, I thought of a edge case can cause
Edge case: let's say input is a deduplicated For point 4, are the memory budget to hold merged batches come from |
I agree that the current implementation uses a very rough estimation, and it could be way off from the actual memory consumption. A better approach is to sort and generate the row representation of the batch right after we ingesting it, then we would know the exact size of sorted batches and their row representations held in memory. The merge phase for handling spilling could simply take away these data and perform merging without reserving more memory. However, this conflicts between some of the optimizations we did in the past:
Yes. It may come from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, thank you so much @Kontinuation and @zhuqi-lucas - I think this PR is very nicely coded, and commented and is quite clear to read.
I think the only other thing left in my mind is to add a few more fuzz tests, but otherwise this is nice
@2010YOUY01 made a nice set of tests too to verify that the memory accounting was accurate, but they are currently disabled. Maybe we can also run them here too 🤔
cc @kazuyukitanimura
cc @westonpace as you filed #10073
self.sort_or_spill_in_mem_batches().await?; | ||
// We've already freed more than half of reserved memory, | ||
// so we can grow the reservation again. There's nothing we can do | ||
// if this try_grow fails. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can implement a more complicated multi-stage merging phase which merges a small portion of streams each time, and perform multiple rounds of merges to produce the final merged stream. However this approach is quite complex and out of the scope of this PR. I believe that there should be dedicated discussions around this problem.
I agree we should implement such a feature as a dedicated, follow on PR / Project.
@zhuqi-lucas or @Kontinuation is there one of you can file a ticket to track the work. I think especially highlighting what cases the current code won't work well is important.
It sounds like there are several issues:
- ensuring we can always spill data (now spilling will sometimes fail if we run out of memory to sort the batches in)
- Ensuring that we can always merge the data that was spilled, even if it had a really wide fanout (like 1000 of spill files)
I think problem 1 could be solved by potentially spilling unsorted batches (and then sorting them separately). This woudl be less efficient (read/write some tuples twice but would work.
Problem 2 could use the multi-pass merge that @Kontinuation describes
@@ -54,7 +54,9 @@ async fn test_sort_10k_mem() { | |||
#[tokio::test] | |||
#[cfg_attr(tarpaulin, ignore)] | |||
async fn test_sort_100k_mem() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please also add some fuzz tests for the cases you are coverting in this PR? Specifically:
RecordBatch
es with String arraysRecordBatch
es that have multiple columns (and thus use the Row format)
Which issue does this PR close?
Rationale for this change
I had a hard time making DataFusion Comet work on cloud instances with 4GB memory per CPU core, partially because DataFusion is very likely to allocates more memory than reserved and run into OOM, or run into various kinds of memory reservation failures. In cases when the partition to process is larger than available memory, we expect spilling to happen to run the query to completion, but got tons of failures instead. We found operators involving SortExec such as sort-merge join triggers the aforementioned problems frequently.
#10073 reports that SortExec may allocate 2X memory than it reserves (see "the second problem" in the issue), and we found that it contributed to most of the OOM cases we encountered when using Comet. We have also found several other problems related to SortExec that are critical for our memory-limited use cases, and this PR tries to accommodate some of them.
What changes are included in this PR?
This PR contains several fixes:
try_collect
the result of merging all at once. We consume the merged batches one after another and reserve memory for each batch. Once the reservation fails we switch to "spill mode" and write all future batches into the spill file. This resolves the 2X memory allocation problem ("the second problem") reported by Memory account not adding up in SortExec #10073, as well as this comment: External sorting not working for (maybe only for string columns??) #12136 (comment)shrink_to_fit
every sorted batches reduce the memory footprint of sorted batches, otherwise sorted string arrays may take 2X the original space in the worst case, due to exponential growth ofMutableBuffer
for storing variable length binary values.shrink_to_fit
is a no-op for primitive-type columns returned bytake_arrays
since they already have the right capacity, and benchmarking showed no significant performance regression for non-primitive types such as string arrays, so I think it is a good change. This resolves "the first problem" reported by Memory account not adding up in SortExec #10073.sort_spill_reservation_bytes
when memory reserved is not big enough for merging. I don't think it is a good change but this is the only solution I can think of to compensate for the extra memory usage for the row representation of sorted columns.The problems with SortExec are not easy to solve without introducing significant changes to the overall design, 3) of this PR is mostly a bandaid solutions. I believe that the implementation needs to be revamped to make all the memory reservation/spilling behave correctly.
Are these changes tested?
Yes. It passes all the tests.
Are there any user-facing changes?
Uses may find that sort operator is more likely to spill when running with memory constraints. The old configurations they had to make sort operator work may not be optimal after applying this PR. For instance, user may configure a super large
sort_spill_reservation_bytes
to make merging work, but this PR reduces the optimal value ofsort_spill_reservation_bytes
for the same workload.