From b0ad779eac784f6930fd8c315e31f01096ad58bf Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Tue, 21 Jan 2025 10:50:11 -0800 Subject: [PATCH] expose parquet chunk size --- src/daft-local-execution/src/sources/scan_task.rs | 2 ++ src/daft-parquet/src/read.rs | 8 ++++++++ src/daft-parquet/src/stream_reader.rs | 3 ++- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index 6e4a76e1fe..ead23b4f62 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -253,6 +253,7 @@ async fn stream_scan_task( FileFormatConfig::Parquet(ParquetSourceConfig { coerce_int96_timestamp_unit, field_id_mapping, + chunk_size, .. }) => { let inference_options = @@ -281,6 +282,7 @@ async fn stream_scan_task( metadata, maintain_order, delete_rows, + *chunk_size, ) .await? } diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 3810532b35..10a018c113 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -375,6 +375,7 @@ async fn stream_parquet_single( metadata: Option>, delete_rows: Option>, maintain_order: bool, + chunk_size: Option, ) -> DaftResult> + Send> { let field_id_mapping_provided = field_id_mapping.is_some(); let columns_to_return = columns.map(|s| s.iter().map(|s| (*s).to_string()).collect_vec()); @@ -417,6 +418,7 @@ async fn stream_parquet_single( metadata, maintain_order, io_stats, + chunk_size, ) .await } else { @@ -427,6 +429,9 @@ async fn stream_parquet_single( field_id_mapping, ) .await?; + + let builder = builder.set_chunk_size(chunk_size); + let builder = builder.set_infer_schema_options(schema_infer_options); let builder = if let Some(columns) = &columns_to_read { @@ -873,6 +878,7 @@ pub async fn stream_parquet( metadata: Option>, maintain_order: bool, delete_rows: Option>, + chunk_size: Option, ) -> DaftResult>> { let stream = stream_parquet_single( uri.to_string(), @@ -887,6 +893,7 @@ pub async fn stream_parquet( metadata, delete_rows, maintain_order, + chunk_size, ) .await?; Ok(Box::pin(stream)) @@ -1169,6 +1176,7 @@ mod tests { None, false, None, + None, ) .await? .collect::>() diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index d2ed694869..8ece492908 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -599,11 +599,12 @@ pub async fn local_parquet_stream( metadata: Option>, maintain_order: bool, io_stats: Option, + chunk_size: Option, ) -> DaftResult<( Arc, BoxStream<'static, DaftResult>, )> { - let chunk_size = PARQUET_MORSEL_SIZE; + let chunk_size = chunk_size.unwrap_or(PARQUET_MORSEL_SIZE); let (metadata, schema_ref, row_ranges, column_iters) = local_parquet_read_into_column_iters( uri, columns.as_deref(),