From 2b71ffb7cc6cd42129e287cc14cc91a4c580c1b4 Mon Sep 17 00:00:00 2001 From: Desmond Cheong Date: Thu, 7 Nov 2024 02:12:55 -0800 Subject: [PATCH] [PERF] Remove upfront buffer allocations for local CSV reader (#3242) The local CSV reader currently makes upfront buffer allocations (80 MiB for file slabs and 80 MiB for CSV buffers). This unnecessarily blows up the read time for small CSV files which don't use so many buffers. Since the local CSV reader allocates additional buffers as needed, we can remove all upfront allocations without affecting anything else in the implementation of the reader. This speeds up reads of small files. At the same time, I benchmarked the performance of the reader against the test case described in https://github.com/Eventual-Inc/Daft/pull/3055 and found no consistent slowdown without upfront comparisons. --- src/daft-csv/src/local.rs | 8 +++----- src/daft-csv/src/local/pool.rs | 23 +++++------------------ 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/src/daft-csv/src/local.rs b/src/daft-csv/src/local.rs index 7a91fa5325..8dd3f4f927 100644 --- a/src/daft-csv/src/local.rs +++ b/src/daft-csv/src/local.rs @@ -252,23 +252,21 @@ pub async fn stream_csv_local( ); // Create CSV buffer pool. - let n_threads: usize = std::thread::available_parallelism() - .unwrap_or(NonZeroUsize::new(2).unwrap()) - .into(); let record_buffer_size = (estimated_mean_row_size + estimated_std_row_size).ceil() as usize; let chunk_size = read_options .as_ref() .and_then(|opt| opt.chunk_size.or_else(|| opt.buffer_size.map(|bs| bs / 8))) .unwrap_or(DEFAULT_CSV_BUFFER_SIZE); let chunk_size_rows = (chunk_size as f64 / record_buffer_size as f64).ceil() as usize; - // TODO(desmond): We might consider creating per-process buffer pools and slab pools. let buffer_pool = Arc::new(CsvBufferPool::new( record_buffer_size, num_fields, chunk_size_rows, - n_threads * 2, )); + let n_threads: usize = std::thread::available_parallelism() + .unwrap_or(NonZeroUsize::new(2).unwrap()) + .into(); stream_csv_as_tables( file, buffer_pool, diff --git a/src/daft-csv/src/local/pool.rs b/src/daft-csv/src/local/pool.rs index c07ebb14bf..cb1fa8c1ae 100644 --- a/src/daft-csv/src/local/pool.rs +++ b/src/daft-csv/src/local/pool.rs @@ -8,8 +8,6 @@ use parking_lot::{Mutex, RwLock}; // The default size of a slab used for reading CSV files in chunks. Currently set to 4 MiB. This can be tuned. pub const SLABSIZE: usize = 4 * 1024 * 1024; -// The default number of slabs in a slab pool. With 20 slabs, we reserve a total of 80 MiB of memory for reading file data. -const SLABPOOL_DEFAULT_SIZE: usize = 20; #[derive(Clone, Debug, Default)] pub struct CsvSlab(Vec); @@ -53,16 +51,10 @@ pub struct CsvBuffer { } impl CsvBufferPool { - pub fn new( - record_size: usize, - num_fields: usize, - num_rows: usize, - initial_pool_size: usize, - ) -> Self { - let chunk_buffers = - vec![CsvSlab::new(record_size, num_fields, num_rows); initial_pool_size]; + pub fn new(record_size: usize, num_fields: usize, num_rows: usize) -> Self { Self { - buffers: Mutex::new(chunk_buffers), + // We start off with an empty pool. Buffers will be allocated on demand. + buffers: Mutex::new(vec![]), record_size, num_fields, num_rows, @@ -108,14 +100,9 @@ pub struct FileSlabPool { impl FileSlabPool { pub fn new() -> Arc { - let slabs: Vec> = (0..SLABPOOL_DEFAULT_SIZE) - // We get uninitialized buffers because we will always populate the buffers with a file read before use. - .map(|_| Box::new_uninit_slice(SLABSIZE)) - .map(|x| unsafe { x.assume_init() }) - .map(|buffer| RwLock::new(FileSlabState::new(buffer, 0))) - .collect(); Arc::new(Self { - slabs: Mutex::new(slabs), + // We start off with an empty pool. Slabs will be allocated on demand. + slabs: Mutex::new(vec![]), }) }