Skip to content

Commit

Permalink
[FEAT] Support hive partitioned reads (#3029)
Browse files Browse the repository at this point in the history
Adds support for reading hive-style partitioned tables via a new
optional `hive_partitioning` parameter for the `read_{csv, json,
parquet}` functions.

This support includes:
1. Partitioning pruning on hive partitions.
2. Schema inference on hive partition values (which can overridden by
user-provided schemas).
3. Support for interpreting `__HIVE_DEFAULT_PARTITIONS__` partition
values as null values (same behaviour as Hive).

---------

Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
desmondcheongzx and colin-ho authored Nov 5, 2024
1 parent 96c538b commit c1d82c5
Show file tree
Hide file tree
Showing 20 changed files with 664 additions and 121 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ members = [
"src/daft-scheduler",
"src/daft-sketch",
"src/daft-sql",
"src/daft-writers",
"src/daft-table",
"src/daft-writers",
"src/hyperloglog",
"src/parquet2"
]
Expand Down
1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,7 @@ class ScanOperatorHandle:
glob_path: list[str],
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
hive_partitioning: bool,
infer_schema: bool,
schema: PySchema | None = None,
file_path_column: str | None = None,
Expand Down
3 changes: 3 additions & 0 deletions daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def read_csv(
allow_variable_columns: bool = False,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
hive_partitioning: bool = False,
use_native_downloader: bool = True,
schema_hints: Optional[Dict[str, DataType]] = None,
_buffer_size: Optional[int] = None,
Expand All @@ -56,6 +57,7 @@ def read_csv(
allow_variable_columns (bool): Whether to allow for variable number of columns in the CSV, defaults to False. If set to True, Daft will append nulls to rows with less columns than the schema, and ignore extra columns in rows with more columns
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
hive_partitioning: Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
Expand Down Expand Up @@ -100,5 +102,6 @@ def read_csv(
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
hive_partitioning=hive_partitioning,
)
return DataFrame(builder)
3 changes: 3 additions & 0 deletions daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def read_json(
schema: Optional[Dict[str, DataType]] = None,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
hive_partitioning: bool = False,
use_native_downloader: bool = True,
schema_hints: Optional[Dict[str, DataType]] = None,
_buffer_size: Optional[int] = None,
Expand All @@ -43,6 +44,7 @@ def read_json(
schema (dict[str, DataType]): A schema that is used as the definitive schema for the JSON if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred.
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
hive_partitioning: Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
is currently experimental.
Expand Down Expand Up @@ -77,5 +79,6 @@ def read_json(
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
hive_partitioning=hive_partitioning,
)
return DataFrame(builder)
3 changes: 3 additions & 0 deletions daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def read_parquet(
schema: Optional[Dict[str, DataType]] = None,
io_config: Optional["IOConfig"] = None,
file_path_column: Optional[str] = None,
hive_partitioning: bool = False,
use_native_downloader: bool = True,
coerce_int96_timestamp_unit: Optional[Union[str, TimeUnit]] = None,
schema_hints: Optional[Dict[str, DataType]] = None,
Expand All @@ -47,6 +48,7 @@ def read_parquet(
schema (dict[str, DataType]): A schema that is used as the definitive schema for the Parquet file if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred.
io_config (IOConfig): Config to be used with the native downloader
file_path_column: Include the source path(s) as a column with this name. Defaults to None.
hive_partitioning: Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet.
coerce_int96_timestamp_unit: TimeUnit to coerce Int96 TimeStamps to. e.g.: [ns, us, ms], Defaults to None.
_multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
Expand Down Expand Up @@ -96,5 +98,6 @@ def read_parquet(
file_format_config=file_format_config,
storage_config=storage_config,
file_path_column=file_path_column,
hive_partitioning=hive_partitioning,
)
return DataFrame(builder)
2 changes: 2 additions & 0 deletions daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def get_tabular_files_scan(
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
file_path_column: str | None = None,
hive_partitioning: bool = False,
) -> LogicalPlanBuilder:
"""Returns a TabularFilesScan LogicalPlan for a given glob filepath."""
# Glob the path using the Runner
Expand All @@ -42,6 +43,7 @@ def get_tabular_files_scan(
infer_schema=infer_schema,
schema=_get_schema_from_dict(schema)._schema if schema is not None else None,
file_path_column=file_path_column,
hive_partitioning=hive_partitioning,
)

builder = LogicalPlanBuilder.from_tabular_scan(
Expand Down
2 changes: 2 additions & 0 deletions src/common/error/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub enum DaftError {
FmtError(#[from] std::fmt::Error),
#[error("DaftError::RegexError {0}")]
RegexError(#[from] regex::Error),
#[error("DaftError::FromUtf8Error {0}")]
FromUtf8Error(#[from] std::string::FromUtf8Error),
#[error("Not Yet Implemented: {0}")]
NotImplemented(String),
}
Expand Down
6 changes: 3 additions & 3 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ impl MicroPartition {
field_id_mapping.clone(),
parquet_metadata,
chunk_size,
scan_task.file_path_column.as_deref(),
scan_task.generated_fields.clone(),
)
.context(DaftCoreComputeSnafu)
}
Expand Down Expand Up @@ -1026,7 +1026,7 @@ pub fn read_parquet_into_micropartition<T: AsRef<str>>(
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
parquet_metadata: Option<Vec<Arc<FileMetaData>>>,
chunk_size: Option<usize>,
file_path_column: Option<&str>,
generated_fields: Option<SchemaRef>,
) -> DaftResult<MicroPartition> {
if let Some(so) = start_offset
&& so > 0
Expand Down Expand Up @@ -1214,7 +1214,7 @@ pub fn read_parquet_into_micropartition<T: AsRef<str>>(
}),
num_rows,
),
file_path_column.map(|s| s.to_string()),
generated_fields,
);

let fill_map = scan_task.partition_spec().map(|pspec| pspec.to_fill_map());
Expand Down
2 changes: 1 addition & 1 deletion src/daft-micropartition/src/ops/cast_to_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl MicroPartition {
schema,
scan_task.storage_config.clone(),
scan_task.pushdowns.clone(),
scan_task.file_path_column.clone(),
scan_task.generated_fields.clone(),
))
};
Ok(Self::new_unloaded(
Expand Down
83 changes: 43 additions & 40 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use common_daft_config::DaftPlanningConfig;
use common_display::mermaid::MermaidDisplayOptions;
use common_error::DaftResult;
use common_error::{DaftError, DaftResult};
use common_file_formats::{FileFormat, FileFormatConfig, ParquetSourceConfig};
use common_io_config::IOConfig;
use daft_core::{
Expand All @@ -19,7 +19,6 @@ use daft_scan::{
PhysicalScanInfo, Pushdowns, ScanOperatorRef,
};
use daft_schema::{
dtype::DataType,
field::Field,
schema::{Schema, SchemaRef},
};
Expand Down Expand Up @@ -201,46 +200,41 @@ impl LogicalPlanBuilder {
partitioning_keys.into(),
pushdowns.clone().unwrap_or_default(),
));
// If column selection (projection) pushdown is specified, prune unselected columns from the schema.
// If file path column is specified, add it to the schema.
let output_schema = match (&pushdowns, &scan_operator.0.file_path_column()) {
(
Some(Pushdowns {
columns: Some(columns),
..
}),
file_path_column_opt,
) if columns.len() < schema.fields.len() => {
let pruned_fields = schema
.fields
.iter()
.filter(|(name, _)| columns.contains(name))
.map(|(_, field)| field.clone());

let finalized_fields = match file_path_column_opt {
Some(file_path_column) => pruned_fields
.chain(std::iter::once(Field::new(
(*file_path_column).to_string(),
DataType::Utf8,
)))
.collect::<Vec<_>>(),
None => pruned_fields.collect::<Vec<_>>(),
};
Arc::new(Schema::new(finalized_fields)?)
// If file path column is specified, check that it doesn't conflict with any column names in the schema.
if let Some(file_path_column) = &scan_operator.0.file_path_column() {
if schema.names().contains(&(*file_path_column).to_string()) {
return Err(DaftError::ValueError(format!(
"Attempting to make a Schema with a file path column name that already exists: {}",
file_path_column
)));
}
(None, Some(file_path_column)) => {
let schema_with_file_path = schema
.fields
.values()
.cloned()
.chain(std::iter::once(Field::new(
(*file_path_column).to_string(),
DataType::Utf8,
)))
.collect::<Vec<_>>();
Arc::new(Schema::new(schema_with_file_path)?)
}
// Add generated fields to the schema.
let schema_with_generated_fields = {
if let Some(generated_fields) = scan_operator.0.generated_fields() {
// We use the non-distinct union here because some scan operators have table schema information that
// already contain partitioned fields. For example,the deltalake scan operator takes the table schema.
Arc::new(schema.non_distinct_union(&generated_fields))
} else {
schema
}
_ => schema,
};
// If column selection (projection) pushdown is specified, prune unselected columns from the schema.
let output_schema = if let Some(Pushdowns {
columns: Some(columns),
..
}) = &pushdowns
&& columns.len() < schema_with_generated_fields.fields.len()
{
let pruned_upstream_schema = schema_with_generated_fields
.fields
.iter()
.filter(|&(name, _)| columns.contains(name))
.map(|(_, field)| field.clone())
.collect::<Vec<_>>();
Arc::new(Schema::new(pruned_upstream_schema)?)
} else {
schema_with_generated_fields
};
let logical_plan: LogicalPlan =
logical_ops::Source::new(output_schema, source_info.into()).into();
Expand Down Expand Up @@ -640,6 +634,7 @@ pub struct ParquetScanBuilder {
pub multithreaded: bool,
pub schema: Option<SchemaRef>,
pub file_path_column: Option<String>,
pub hive_partitioning: bool,
}

impl ParquetScanBuilder {
Expand All @@ -661,6 +656,7 @@ impl ParquetScanBuilder {
schema: None,
io_config: None,
file_path_column: None,
hive_partitioning: false,
}
}
pub fn infer_schema(mut self, infer_schema: bool) -> Self {
Expand Down Expand Up @@ -693,6 +689,7 @@ impl ParquetScanBuilder {
self.multithreaded = multithreaded;
self
}

pub fn schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
Expand All @@ -703,6 +700,11 @@ impl ParquetScanBuilder {
self
}

pub fn hive_partitioning(mut self, hive_partitioning: bool) -> Self {
self.hive_partitioning = hive_partitioning;
self
}

pub fn finish(self) -> DaftResult<LogicalPlanBuilder> {
let cfg = ParquetSourceConfig {
coerce_int96_timestamp_unit: self.coerce_int96_timestamp_unit,
Expand All @@ -720,6 +722,7 @@ impl ParquetScanBuilder {
self.infer_schema,
self.schema,
self.file_path_column,
self.hive_partitioning,
)?);

LogicalPlanBuilder::table_scan(ScanOperatorRef(operator), None)
Expand Down
4 changes: 4 additions & 0 deletions src/daft-scan/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[dependencies]
arrow2 = {workspace = true}
common-daft-config = {path = "../common/daft-config", default-features = false}
common-display = {path = "../common/display", default-features = false}
common-error = {path = "../common/error", default-features = false}
Expand All @@ -8,6 +9,7 @@ common-py-serde = {path = "../common/py-serde", default-features = false}
common-runtime = {path = "../common/runtime", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-csv = {path = "../daft-csv", default-features = false}
daft-decoding = {path = "../daft-decoding", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-json = {path = "../daft-json", default-features = false}
Expand All @@ -16,11 +18,13 @@ daft-schema = {path = "../daft-schema", default-features = false}
daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
indexmap = {workspace = true}
itertools = {workspace = true}
parquet2 = {workspace = true}
pyo3 = {workspace = true, optional = true}
serde = {workspace = true}
snafu = {workspace = true}
urlencoding = "2.1.3"

[features]
python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "daft-table/python", "daft-stats/python", "common-file-formats/python", "common-io-config/python", "common-daft-config/python", "daft-schema/python"]
Expand Down
4 changes: 4 additions & 0 deletions src/daft-scan/src/anonymous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ impl ScanOperator for AnonymousScanOperator {
None
}

fn generated_fields(&self) -> Option<SchemaRef> {
None
}

fn can_absorb_filter(&self) -> bool {
false
}
Expand Down
Loading

0 comments on commit c1d82c5

Please sign in to comment.