diff --git a/src/daft-csv/src/local.rs b/src/daft-csv/src/local.rs index 7a91fa5325..8dd3f4f927 100644 --- a/src/daft-csv/src/local.rs +++ b/src/daft-csv/src/local.rs @@ -252,23 +252,21 @@ pub async fn stream_csv_local( ); // Create CSV buffer pool. - let n_threads: usize = std::thread::available_parallelism() - .unwrap_or(NonZeroUsize::new(2).unwrap()) - .into(); let record_buffer_size = (estimated_mean_row_size + estimated_std_row_size).ceil() as usize; let chunk_size = read_options .as_ref() .and_then(|opt| opt.chunk_size.or_else(|| opt.buffer_size.map(|bs| bs / 8))) .unwrap_or(DEFAULT_CSV_BUFFER_SIZE); let chunk_size_rows = (chunk_size as f64 / record_buffer_size as f64).ceil() as usize; - // TODO(desmond): We might consider creating per-process buffer pools and slab pools. let buffer_pool = Arc::new(CsvBufferPool::new( record_buffer_size, num_fields, chunk_size_rows, - n_threads * 2, )); + let n_threads: usize = std::thread::available_parallelism() + .unwrap_or(NonZeroUsize::new(2).unwrap()) + .into(); stream_csv_as_tables( file, buffer_pool, diff --git a/src/daft-csv/src/local/pool.rs b/src/daft-csv/src/local/pool.rs index c07ebb14bf..cb1fa8c1ae 100644 --- a/src/daft-csv/src/local/pool.rs +++ b/src/daft-csv/src/local/pool.rs @@ -8,8 +8,6 @@ use parking_lot::{Mutex, RwLock}; // The default size of a slab used for reading CSV files in chunks. Currently set to 4 MiB. This can be tuned. pub const SLABSIZE: usize = 4 * 1024 * 1024; -// The default number of slabs in a slab pool. With 20 slabs, we reserve a total of 80 MiB of memory for reading file data. -const SLABPOOL_DEFAULT_SIZE: usize = 20; #[derive(Clone, Debug, Default)] pub struct CsvSlab(Vec); @@ -53,16 +51,10 @@ pub struct CsvBuffer { } impl CsvBufferPool { - pub fn new( - record_size: usize, - num_fields: usize, - num_rows: usize, - initial_pool_size: usize, - ) -> Self { - let chunk_buffers = - vec![CsvSlab::new(record_size, num_fields, num_rows); initial_pool_size]; + pub fn new(record_size: usize, num_fields: usize, num_rows: usize) -> Self { Self { - buffers: Mutex::new(chunk_buffers), + // We start off with an empty pool. Buffers will be allocated on demand. + buffers: Mutex::new(vec![]), record_size, num_fields, num_rows, @@ -108,14 +100,9 @@ pub struct FileSlabPool { impl FileSlabPool { pub fn new() -> Arc { - let slabs: Vec> = (0..SLABPOOL_DEFAULT_SIZE) - // We get uninitialized buffers because we will always populate the buffers with a file read before use. - .map(|_| Box::new_uninit_slice(SLABSIZE)) - .map(|x| unsafe { x.assume_init() }) - .map(|buffer| RwLock::new(FileSlabState::new(buffer, 0))) - .collect(); Arc::new(Self { - slabs: Mutex::new(slabs), + // We start off with an empty pool. Slabs will be allocated on demand. + slabs: Mutex::new(vec![]), }) }