Skip to content

Commit

Permalink
[PERF] Predicate Pushdown for CSV Reader (#1724)
Browse files Browse the repository at this point in the history
* Enables predicate and early termination in the CSV reader
* Also refactors our `FromArrow` to take in a FieldRef
  • Loading branch information
samster25 authored Dec 14, 2023
1 parent 6d71636 commit 34a5513
Show file tree
Hide file tree
Showing 24 changed files with 457 additions and 151 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,15 @@ class CsvConvertOptions:
include_columns: list[str] | None
column_names: list[str] | None
schema: PySchema | None
predicate: PyExpr | None

def __init__(
self,
limit: int | None = None,
include_columns: list[str] | None = None,
column_names: list[str] | None = None,
schema: PySchema | None = None,
predicate: PyExpr | None = None,
): ...

class CsvParseOptions:
Expand Down
10 changes: 5 additions & 5 deletions src/daft-core/src/array/growable/arrow_growable.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::marker::PhantomData;
use std::{marker::PhantomData, sync::Arc};

use common_error::DaftResult;

Expand Down Expand Up @@ -46,8 +46,8 @@ where
#[inline]
fn build(&mut self) -> DaftResult<Series> {
let arrow_array = self.arrow2_growable.as_box();
let field = Field::new(self.name.clone(), self.dtype.clone());
Ok(DataArray::<T>::from_arrow(&field, arrow_array)?.into_series())
let field = Arc::new(Field::new(self.name.clone(), self.dtype.clone()));
Ok(DataArray::<T>::from_arrow(field, arrow_array)?.into_series())
}
}

Expand Down Expand Up @@ -207,7 +207,7 @@ impl<'a> Growable for ArrowExtensionGrowable<'a> {
#[inline]
fn build(&mut self) -> DaftResult<Series> {
let arr = self.child_growable.as_box();
let field = Field::new(self.name.clone(), self.dtype.clone());
Ok(ExtensionArray::from_arrow(&field, arr)?.into_series())
let field = Arc::new(Field::new(self.name.clone(), self.dtype.clone()));
Ok(ExtensionArray::from_arrow(field, arr)?.into_series())
}
}
18 changes: 9 additions & 9 deletions src/daft-core/src/array/ops/cast.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::iter::repeat;
use std::{iter::repeat, sync::Arc};

use super::as_arrow::AsArrow;
use crate::{
Expand Down Expand Up @@ -141,8 +141,8 @@ where
}
};

let new_field = Field::new(to_cast.name(), dtype.clone());
Series::from_arrow(&new_field, result_arrow_physical_array)
let new_field = Arc::new(Field::new(to_cast.name(), dtype.clone()));
Series::from_arrow(new_field, result_arrow_physical_array)
}

fn arrow_cast<T>(to_cast: &DataArray<T>, dtype: &DataType) -> DaftResult<Series>
Expand Down Expand Up @@ -216,8 +216,8 @@ where
)));
};

let new_field = Field::new(to_cast.name(), dtype.clone());
Series::from_arrow(&new_field, result_array)
let new_field = Arc::new(Field::new(to_cast.name(), dtype.clone()));
Series::from_arrow(new_field, result_array)
}

impl<T> DataArray<T>
Expand Down Expand Up @@ -739,7 +739,7 @@ fn extract_python_like_to_fixed_size_list<
);

FixedSizeListArray::from_arrow(
&Field::new(python_objects.name(), daft_type),
Arc::new(Field::new(python_objects.name(), daft_type)),
Box::new(list_array),
)
}
Expand Down Expand Up @@ -776,7 +776,7 @@ fn extract_python_like_to_list<
);

ListArray::from_arrow(
&Field::new(python_objects.name(), daft_type),
Arc::new(Field::new(python_objects.name(), daft_type)),
Box::new(list_arrow_array),
)
}
Expand Down Expand Up @@ -939,12 +939,12 @@ fn extract_python_like_to_tensor_array<
Field::new(name, physical_type),
vec![
ListArray::from_arrow(
&Field::new("data", data_array.data_type().into()),
Arc::new(Field::new("data", data_array.data_type().into())),
data_array,
)?
.into_series(),
ListArray::from_arrow(
&Field::new("shape", shapes_array.data_type().into()),
Arc::new(Field::new("shape", shapes_array.data_type().into())),
shapes_array,
)?
.into_series(),
Expand Down
28 changes: 16 additions & 12 deletions src/daft-core/src/array/ops/from_arrow.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::sync::Arc;

use common_error::{DaftError, DaftResult};

use crate::{
array::{DataArray, FixedSizeListArray, ListArray, StructArray},
datatypes::{logical::LogicalArray, DaftDataType, DaftLogicalType, DaftPhysicalType, Field},
datatypes::{
logical::LogicalArray, DaftDataType, DaftLogicalType, DaftPhysicalType, Field, FieldRef,
},
DataType, Series,
};

Expand All @@ -11,11 +15,11 @@ pub trait FromArrow
where
Self: Sized,
{
fn from_arrow(field: &Field, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self>;
fn from_arrow(field: FieldRef, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self>;
}

impl<T: DaftPhysicalType> FromArrow for DataArray<T> {
fn from_arrow(field: &Field, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self> {
fn from_arrow(field: FieldRef, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self> {
DataArray::<T>::try_from((field.clone(), arrow_arr))
}
}
Expand All @@ -24,19 +28,19 @@ impl<L: DaftLogicalType> FromArrow for LogicalArray<L>
where
<L::PhysicalType as DaftDataType>::ArrayType: FromArrow,
{
fn from_arrow(field: &Field, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self> {
let data_array_field = Field::new(field.name.clone(), field.dtype.to_physical());
fn from_arrow(field: FieldRef, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self> {
let data_array_field = Arc::new(Field::new(field.name.clone(), field.dtype.to_physical()));
let physical_arrow_arr = arrow_arr.to_type(data_array_field.dtype.to_arrow()?);
let physical = <L::PhysicalType as DaftDataType>::ArrayType::from_arrow(
&data_array_field,
data_array_field,
physical_arrow_arr,
)?;
Ok(LogicalArray::<L>::new(field.clone(), physical))
}
}

impl FromArrow for FixedSizeListArray {
fn from_arrow(field: &Field, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self> {
fn from_arrow(field: FieldRef, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self> {
match (&field.dtype, arrow_arr.data_type()) {
(DataType::FixedSizeList(daft_child_dtype, daft_size), arrow2::datatypes::DataType::FixedSizeList(_arrow_child_field, arrow_size)) => {
if daft_size != arrow_size {
Expand All @@ -45,7 +49,7 @@ impl FromArrow for FixedSizeListArray {

let arrow_arr = arrow_arr.as_ref().as_any().downcast_ref::<arrow2::array::FixedSizeListArray>().unwrap();
let arrow_child_array = arrow_arr.values();
let child_series = Series::from_arrow(&Field::new("item", daft_child_dtype.as_ref().clone()), arrow_child_array.clone())?;
let child_series = Series::from_arrow(Arc::new(Field::new("item", daft_child_dtype.as_ref().clone())), arrow_child_array.clone())?;
Ok(FixedSizeListArray::new(
field.clone(),
child_series,
Expand All @@ -58,15 +62,15 @@ impl FromArrow for FixedSizeListArray {
}

impl FromArrow for ListArray {
fn from_arrow(field: &Field, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self> {
fn from_arrow(field: FieldRef, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self> {
match (&field.dtype, arrow_arr.data_type()) {
(DataType::List(daft_child_dtype), arrow2::datatypes::DataType::List(arrow_child_field)) |
(DataType::List(daft_child_dtype), arrow2::datatypes::DataType::LargeList(arrow_child_field))
=> {
let arrow_arr = arrow_arr.to_type(arrow2::datatypes::DataType::LargeList(arrow_child_field.clone()));
let arrow_arr = arrow_arr.as_any().downcast_ref::<arrow2::array::ListArray<i64>>().unwrap();
let arrow_child_array = arrow_arr.values();
let child_series = Series::from_arrow(&Field::new("list", daft_child_dtype.as_ref().clone()), arrow_child_array.clone())?;
let child_series = Series::from_arrow(Arc::new(Field::new("list", daft_child_dtype.as_ref().clone())), arrow_child_array.clone())?;
Ok(ListArray::new(
field.clone(),
child_series,
Expand All @@ -80,7 +84,7 @@ impl FromArrow for ListArray {
}

impl FromArrow for StructArray {
fn from_arrow(field: &Field, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self> {
fn from_arrow(field: FieldRef, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self> {
match (&field.dtype, arrow_arr.data_type()) {
(DataType::Struct(fields), arrow2::datatypes::DataType::Struct(arrow_fields)) => {
if fields.len() != arrow_fields.len() {
Expand All @@ -91,7 +95,7 @@ impl FromArrow for StructArray {
let arrow_child_arrays = arrow_arr.values();

let child_series = fields.iter().zip(arrow_child_arrays.iter()).map(|(daft_field, arrow_arr)| {
Series::from_arrow(daft_field, arrow_arr.to_boxed())
Series::from_arrow(Arc::new(daft_field.clone()), arrow_arr.to_boxed())
}).collect::<DaftResult<Vec<Series>>>()?;

Ok(StructArray::new(
Expand Down
7 changes: 5 additions & 2 deletions src/daft-core/src/array/ops/image.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::io::{Seek, SeekFrom, Write};
use std::sync::Arc;
use std::vec;

use image::{ColorType, DynamicImage, ImageBuffer};
Expand Down Expand Up @@ -691,8 +692,10 @@ impl FixedShapeImageArray {
Box::new(arrow2::array::PrimitiveArray::from_vec(data)),
validity,
));
let physical_array =
FixedSizeListArray::from_arrow(&Field::new(name, (&arrow_dtype).into()), arrow_array)?;
let physical_array = FixedSizeListArray::from_arrow(
Arc::new(Field::new(name, (&arrow_dtype).into())),
arrow_array,
)?;
let logical_dtype = DataType::FixedShapeImage(*image_mode, height, width);
Ok(Self::new(Field::new(name, logical_dtype), physical_array))
}
Expand Down
2 changes: 2 additions & 0 deletions src/daft-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub struct Field {
pub metadata: Arc<Metadata>,
}

pub type FieldRef = Arc<Field>;

#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, Hash)]
pub struct FieldID {
pub id: Arc<str>,
Expand Down
1 change: 1 addition & 0 deletions src/daft-core/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use binary_ops::try_physical_supertype;
pub use dtype::DataType;
pub use field::Field;
pub use field::FieldID;
pub use field::FieldRef;
pub use image_format::ImageFormat;
pub use image_mode::ImageMode;
use num_traits::{Bounded, Float, FromPrimitive, Num, NumCast, ToPrimitive, Zero};
Expand Down
34 changes: 21 additions & 13 deletions src/daft-core/src/series/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,16 @@ use super::Series;
use crate::array::ops::from_arrow::FromArrow;
use crate::series::array_impl::IntoSeries;

impl TryFrom<(&str, Box<dyn arrow2::array::Array>)> for Series {
type Error = DaftError;

fn try_from(item: (&str, Box<dyn arrow2::array::Array>)) -> DaftResult<Self> {
let (name, array) = item;
let source_arrow_type = array.data_type();
let dtype: DataType = source_arrow_type.into();
let field = Arc::new(Field::new(name, dtype.clone()));

impl Series {
pub fn try_from_field_and_arrow_array(
field: Arc<Field>,
array: Box<dyn arrow2::array::Array>,
) -> DaftResult<Self> {
// TODO(Nested): Refactor this out with nested logical types in StructArray and ListArray
// Corner-case nested logical types that have not yet been migrated to new Array formats
// to hold only casted physical arrow arrays.
let physical_type = dtype.to_physical();
if (matches!(dtype, DataType::List(..)) || dtype.is_extension()) && physical_type != dtype {
let dtype = &field.dtype;
if matches!(dtype, DataType::List(..) | DataType::Extension(..)) && let physical_type = dtype.to_physical() && &physical_type != dtype {
let arrow_physical_type = physical_type.to_arrow()?;
let casted_array = arrow2::compute::cast::cast(
array.as_ref(),
Expand All @@ -35,12 +31,24 @@ impl TryFrom<(&str, Box<dyn arrow2::array::Array>)> for Series {
},
)?;
return Ok(
with_match_daft_types!(physical_type, |$T| <$T as DaftDataType>::ArrayType::from_arrow(field.as_ref(), casted_array)?.into_series()),
with_match_daft_types!(physical_type, |$T| <$T as DaftDataType>::ArrayType::from_arrow(field, casted_array)?.into_series()),
);
}

with_match_daft_types!(dtype, |$T| {
Ok(<$T as DaftDataType>::ArrayType::from_arrow(&field, array)?.into_series())
Ok(<$T as DaftDataType>::ArrayType::from_arrow(field, array)?.into_series())
})
}
}

impl TryFrom<(&str, Box<dyn arrow2::array::Array>)> for Series {
type Error = DaftError;

fn try_from(item: (&str, Box<dyn arrow2::array::Array>)) -> DaftResult<Self> {
let (name, array) = item;
let source_arrow_type = array.data_type();
let dtype: DataType = source_arrow_type.into();
let field = Arc::new(Field::new(name, dtype.clone()));
Self::try_from_field_and_arrow_array(field, array)
}
}
7 changes: 5 additions & 2 deletions src/daft-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{

use crate::{
array::ops::{from_arrow::FromArrow, full::FullNull},
datatypes::{DataType, Field},
datatypes::{DataType, Field, FieldRef},
utils::display_table::make_comfy_table,
with_match_daft_types,
};
Expand All @@ -35,7 +35,10 @@ impl Series {
///
/// This function will check the provided [`Field`] (and all its associated potentially nested fields/dtypes) against
/// the provided [`arrow2::array::Array`] for compatibility, and returns an error if they do not match.
pub fn from_arrow(field: &Field, arrow_arr: Box<dyn arrow2::array::Array>) -> DaftResult<Self> {
pub fn from_arrow(
field: FieldRef,
arrow_arr: Box<dyn arrow2::array::Array>,
) -> DaftResult<Self> {
with_match_daft_types!(field.dtype, |$T| {
Ok(<<$T as DaftDataType>::ArrayType as FromArrow>::from_arrow(field, arrow_arr)?.into_series())
})
Expand Down
3 changes: 2 additions & 1 deletion src/daft-csv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ csv-async = "1.2.6"
daft-compression = {path = "../daft-compression", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-decoding = {path = "../daft-decoding"}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
Expand All @@ -33,7 +34,7 @@ rstest = {workspace = true}

[features]
default = ["python"]
python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python"]
python = ["dep:pyo3", "dep:pyo3-log", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python", "daft-dsl/python"]

[package]
edition = {workspace = true}
Expand Down
Loading

0 comments on commit 34a5513

Please sign in to comment.