From ecc2970db440a8cb827ecc2f2f255b2481d7aa2d Mon Sep 17 00:00:00 2001 From: "R. C. Howell" <5731503+rchowell@users.noreply.github.com> Date: Fri, 17 Jan 2025 12:36:21 -0800 Subject: [PATCH] feat(iceberg): Adds support for read_iceberg with metadata_location to Daft-SQL (#3701) --- .gitignore | 3 + daft/io/_iceberg.py | 12 ++- src/daft-scan/src/builder.rs | 43 +++++++++++ src/daft-scan/src/storage_config.rs | 6 ++ src/daft-sql/src/functions.rs | 1 + src/daft-sql/src/table_provider/mod.rs | 56 +++----------- .../src/table_provider/read_deltalake.rs | 44 +++++++++++ .../src/table_provider/read_iceberg.rs | 74 +++++++++++++++++++ tests/sql/test_table_functions/__init__.py | 0 .../test_table_functions/test_read_iceberg.py | 20 +++++ .../test_table_functions.py} | 0 11 files changed, 211 insertions(+), 48 deletions(-) create mode 100644 src/daft-sql/src/table_provider/read_deltalake.rs create mode 100644 src/daft-sql/src/table_provider/read_iceberg.rs create mode 100644 tests/sql/test_table_functions/__init__.py create mode 100644 tests/sql/test_table_functions/test_read_iceberg.py rename tests/sql/{test_table_funcs.py => test_table_functions/test_table_functions.py} (100%) diff --git a/.gitignore b/.gitignore index bb5369738d..e21b99245a 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,6 @@ log/ # helix editor .helix + +# uv +uv.lock diff --git a/daft/io/_iceberg.py b/daft/io/_iceberg.py index c3ea30aaa9..a627dadb92 100644 --- a/daft/io/_iceberg.py +++ b/daft/io/_iceberg.py @@ -1,6 +1,6 @@ # isort: dont-add-import: from __future__ import annotations -from typing import TYPE_CHECKING, Any, Dict, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional, Union from daft import context from daft.api_annotations import PublicAPI @@ -53,7 +53,7 @@ def get_first_property_value(*property_names: str) -> Optional[Any]: @PublicAPI def read_iceberg( - table: "pyiceberg.table.Table", + table: Union[str, "pyiceberg.table.Table"], snapshot_id: Optional[int] = None, io_config: Optional["IOConfig"] = None, ) -> DataFrame: @@ -75,15 +75,21 @@ def read_iceberg( official project for Python. Args: - table (pyiceberg.table.Table): `PyIceberg Table `__ created using the PyIceberg library + table (str or pyiceberg.table.Table): `PyIceberg Table `__ created using the PyIceberg library snapshot_id (int, optional): Snapshot ID of the table to query io_config (IOConfig, optional): A custom IOConfig to use when accessing Iceberg object storage data. If provided, configurations set in `table` are ignored. Returns: DataFrame: a DataFrame with the schema converted from the specified Iceberg table """ + import pyiceberg + from daft.iceberg.iceberg_scan import IcebergScanOperator + # support for read_iceberg('path/to/metadata.json') + if isinstance(table, str): + table = pyiceberg.table.StaticTable.from_metadata(metadata_location=table) + io_config = ( _convert_iceberg_file_io_properties_to_io_config(table.io.properties) if io_config is None else io_config ) diff --git a/src/daft-scan/src/builder.rs b/src/daft-scan/src/builder.rs index 18b7cda84d..30b6da294a 100644 --- a/src/daft-scan/src/builder.rs +++ b/src/daft-scan/src/builder.rs @@ -399,3 +399,46 @@ pub fn delta_scan( ) -> DaftResult { panic!("Delta Lake scan requires the 'python' feature to be enabled.") } + +/// Creates a logical scan operator from a Python IcebergScanOperator. +/// ex: +/// ```python +/// iceberg_table = pyiceberg.table.StaticTable.from_metadata(metadata_location) +/// iceberg_scan = daft.iceberg.iceberg_scan.IcebergScanOperator(iceberg_table, snapshot_id, storage_config) +/// ``` +#[cfg(feature = "python")] +pub fn iceberg_scan>( + metadata_location: T, + snapshot_id: Option, + io_config: Option, +) -> DaftResult { + use pyo3::IntoPyObjectExt; + let storage_config: StorageConfig = io_config.unwrap_or_default().into(); + let scan_operator = Python::with_gil(|py| -> DaftResult { + // iceberg_table = pyiceberg.table.StaticTable.from_metadata(metadata_location) + let iceberg_table_module = PyModule::import(py, "pyiceberg.table")?; + let iceberg_static_table = iceberg_table_module.getattr("StaticTable")?; + let iceberg_table = + iceberg_static_table.call_method1("from_metadata", (metadata_location.as_ref(),))?; + // iceberg_scan = daft.iceberg.iceberg_scan.IcebergScanOperator(iceberg_table, snapshot_id, storage_config) + let iceberg_scan_module = PyModule::import(py, "daft.iceberg.iceberg_scan")?; + let iceberg_scan_class = iceberg_scan_module.getattr("IcebergScanOperator")?; + let iceberg_scan = iceberg_scan_class + .call1((iceberg_table, snapshot_id, storage_config))? + .into_py_any(py)?; + Ok(ScanOperatorHandle::from_python_scan_operator( + iceberg_scan, + py, + )?) + })?; + LogicalPlanBuilder::table_scan(scan_operator.into(), None) +} + +#[cfg(not(feature = "python"))] +pub fn iceberg_scan>( + uri: T, + snapshot_id: Option, + io_config: Option, +) -> DaftResult { + panic!("Iceberg scan requires the 'python' feature to be enabled.") +} diff --git a/src/daft-scan/src/storage_config.rs b/src/daft-scan/src/storage_config.rs index 270964b705..b4f8a2cceb 100644 --- a/src/daft-scan/src/storage_config.rs +++ b/src/daft-scan/src/storage_config.rs @@ -65,6 +65,12 @@ impl Default for StorageConfig { } } +impl From for StorageConfig { + fn from(io_config: IOConfig) -> Self { + Self::new_internal(true, Some(io_config)) + } +} + #[cfg(feature = "python")] #[pymethods] impl StorageConfig { diff --git a/src/daft-sql/src/functions.rs b/src/daft-sql/src/functions.rs index 54cac3491f..79e92ea2a1 100644 --- a/src/daft-sql/src/functions.rs +++ b/src/daft-sql/src/functions.rs @@ -92,6 +92,7 @@ pub trait SQLFunction: Send + Sync { .collect::>>() } + // nit cleanup: argument consistency with SQLTableFunction fn to_expr(&self, inputs: &[FunctionArg], planner: &SQLPlanner) -> SQLPlannerResult; /// Produce the docstrings for this SQL function, parametrized by an alias which is the function name to invoke this in SQL diff --git a/src/daft-sql/src/table_provider/mod.rs b/src/daft-sql/src/table_provider/mod.rs index 9951dd099e..fabce9bdc8 100644 --- a/src/daft-sql/src/table_provider/mod.rs +++ b/src/daft-sql/src/table_provider/mod.rs @@ -1,11 +1,16 @@ -pub mod read_csv; -pub mod read_json; -pub mod read_parquet; +mod read_csv; +mod read_deltalake; +mod read_iceberg; +mod read_json; +mod read_parquet; + use std::{collections::HashMap, sync::Arc}; use daft_logical_plan::LogicalPlanBuilder; use once_cell::sync::Lazy; use read_csv::ReadCsvFunction; +use read_deltalake::ReadDeltalakeFunction; +use read_iceberg::SqlReadIceberg; use read_json::ReadJsonFunction; use read_parquet::ReadParquetFunction; use sqlparser::ast::TableFunctionArgs; @@ -20,10 +25,10 @@ use crate::{ pub(crate) static SQL_TABLE_FUNCTIONS: Lazy = Lazy::new(|| { let mut functions = SQLTableFunctions::new(); functions.add_fn("read_csv", ReadCsvFunction); + functions.add_fn("read_deltalake", ReadDeltalakeFunction); + functions.add_fn("read_iceberg", SqlReadIceberg); functions.add_fn("read_json", ReadJsonFunction); functions.add_fn("read_parquet", ReadParquetFunction); - #[cfg(feature = "python")] - functions.add_fn("read_deltalake", ReadDeltalakeFunction); functions }); @@ -70,6 +75,7 @@ impl<'a> SQLPlanner<'a> { } } +// nit cleanup: switch param order and rename to `to_logical_plan` for consistency with SQLFunction. pub(crate) trait SQLTableFunction: Send + Sync { fn plan( &self, @@ -77,43 +83,3 @@ pub(crate) trait SQLTableFunction: Send + Sync { args: &TableFunctionArgs, ) -> SQLPlannerResult; } - -pub struct ReadDeltalakeFunction; - -#[cfg(feature = "python")] -impl SQLTableFunction for ReadDeltalakeFunction { - fn plan( - &self, - planner: &SQLPlanner, - args: &TableFunctionArgs, - ) -> SQLPlannerResult { - let (uri, io_config) = match args.args.as_slice() { - [uri] => (uri, None), - [uri, io_config] => { - let args = planner.parse_function_args(&[io_config.clone()], &["io_config"], 0)?; - let io_config = args.get_named("io_config").map(expr_to_iocfg).transpose()?; - - (uri, io_config) - } - _ => unsupported_sql_err!("Expected one or two arguments"), - }; - let uri = planner.plan_function_arg(uri)?; - - let Some(uri) = uri.as_literal().and_then(|lit| lit.as_str()) else { - unsupported_sql_err!("Expected a string literal for the first argument"); - }; - - daft_scan::builder::delta_scan(uri, io_config, true).map_err(From::from) - } -} - -#[cfg(not(feature = "python"))] -impl SQLTableFunction for ReadDeltalakeFunction { - fn plan( - &self, - planner: &SQLPlanner, - args: &TableFunctionArgs, - ) -> SQLPlannerResult { - unsupported_sql_err!("`read_deltalake` function is not supported. Enable the `python` feature to use this function.") - } -} diff --git a/src/daft-sql/src/table_provider/read_deltalake.rs b/src/daft-sql/src/table_provider/read_deltalake.rs new file mode 100644 index 0000000000..142ce8ec89 --- /dev/null +++ b/src/daft-sql/src/table_provider/read_deltalake.rs @@ -0,0 +1,44 @@ +use daft_logical_plan::LogicalPlanBuilder; +use sqlparser::ast::TableFunctionArgs; + +use super::{expr_to_iocfg, SQLTableFunction}; +use crate::{error::SQLPlannerResult, unsupported_sql_err, SQLPlanner}; + +pub(super) struct ReadDeltalakeFunction; + +#[cfg(feature = "python")] +impl SQLTableFunction for ReadDeltalakeFunction { + fn plan( + &self, + planner: &SQLPlanner, + args: &TableFunctionArgs, + ) -> SQLPlannerResult { + let (uri, io_config) = match args.args.as_slice() { + [uri] => (uri, None), + [uri, io_config] => { + let args = planner.parse_function_args(&[io_config.clone()], &["io_config"], 0)?; + let io_config = args.get_named("io_config").map(expr_to_iocfg).transpose()?; + (uri, io_config) + } + _ => unsupported_sql_err!("Expected one or two arguments"), + }; + let uri = planner.plan_function_arg(uri)?; + + let Some(uri) = uri.as_literal().and_then(|lit| lit.as_str()) else { + unsupported_sql_err!("Expected a string literal for the first argument"); + }; + + daft_scan::builder::delta_scan(uri, io_config, true).map_err(From::from) + } +} + +#[cfg(not(feature = "python"))] +impl SQLTableFunction for ReadDeltalakeFunction { + fn plan( + &self, + planner: &SQLPlanner, + args: &TableFunctionArgs, + ) -> SQLPlannerResult { + unsupported_sql_err!("`read_deltalake` function is not supported. Enable the `python` feature to use this function.") + } +} diff --git a/src/daft-sql/src/table_provider/read_iceberg.rs b/src/daft-sql/src/table_provider/read_iceberg.rs new file mode 100644 index 0000000000..b8b9e30d8a --- /dev/null +++ b/src/daft-sql/src/table_provider/read_iceberg.rs @@ -0,0 +1,74 @@ +use common_io_config::IOConfig; +use daft_logical_plan::LogicalPlanBuilder; +use sqlparser::ast::TableFunctionArgs; + +use super::SQLTableFunction; +use crate::{ + error::{PlannerError, SQLPlannerResult}, + functions::{self, SQLFunctionArguments}, + SQLPlanner, +}; + +/// The Daft-SQL `read_iceberg` table-value function. +pub(super) struct SqlReadIceberg; + +/// The Daft-SQL `read_iceberg` table-value function arguments. +struct SqlReadIcebergArgs { + metadata_location: String, + snapshot_id: Option, + io_config: Option, +} + +impl SqlReadIcebergArgs { + /// Like a TryFrom but from TalbeFunctionArgs directly and passing the planner. + fn try_from(planner: &SQLPlanner, args: &TableFunctionArgs) -> SQLPlannerResult { + planner.plan_function_args(&args.args, &["snapshot_id", "io_config"], 1) + } +} + +impl TryFrom for SqlReadIcebergArgs { + type Error = PlannerError; + + /// This is required to use `planner.plan_function_args` + fn try_from(args: SQLFunctionArguments) -> Result { + let metadata_location: String = args + .try_get_positional(0)? + .expect("read_iceberg requires a path"); + let snapshot_id: Option = args.try_get_named("snapshot_id")?; + let io_config: Option = functions::args::parse_io_config(&args)?.into(); + Ok(Self { + metadata_location, + snapshot_id, + io_config, + }) + } +} + +/// Translates the `read_iceberg` table-value function to a logical scan operator. +#[cfg(feature = "python")] +impl SQLTableFunction for SqlReadIceberg { + fn plan( + &self, + planner: &SQLPlanner, + args: &TableFunctionArgs, + ) -> SQLPlannerResult { + let args = SqlReadIcebergArgs::try_from(planner, args)?; + Ok(daft_scan::builder::iceberg_scan( + args.metadata_location, + args.snapshot_id, + args.io_config, + )?) + } +} + +/// Translates the `read_iceberg` table-value function to a logical scan operator (errors without python feature). +#[cfg(not(feature = "python"))] +impl SQLTableFunction for SqlReadIceberg { + fn plan( + &self, + planner: &SQLPlanner, + args: &TableFunctionArgs, + ) -> SQLPlannerResult { + crate::unsupported_sql_err!("`read_iceberg` function is not supported. Enable the `python` feature to use this function.") + } +} diff --git a/tests/sql/test_table_functions/__init__.py b/tests/sql/test_table_functions/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/sql/test_table_functions/test_read_iceberg.py b/tests/sql/test_table_functions/test_read_iceberg.py new file mode 100644 index 0000000000..94e9ab43fb --- /dev/null +++ b/tests/sql/test_table_functions/test_read_iceberg.py @@ -0,0 +1,20 @@ +import pytest + +import daft + + +@pytest.mark.skip( + "invoke manually via `uv run tests/sql/test_table_functions/test_read_iceberg.py `" +) +def test_read_iceberg(metadata_location): + df = daft.sql(f"SELECT * FROM read_iceberg('{metadata_location}')") + print(df.collect()) + + +if __name__ == "__main__": + import sys + + if len(sys.argv) < 2: + print("usage: test_read_iceberg.py ") + sys.exit(1) + test_read_iceberg(metadata_location=sys.argv[1]) diff --git a/tests/sql/test_table_funcs.py b/tests/sql/test_table_functions/test_table_functions.py similarity index 100% rename from tests/sql/test_table_funcs.py rename to tests/sql/test_table_functions/test_table_functions.py