Skip to content

Commit

Permalink
feat(iceberg): Adds support for read_iceberg with metadata_location t…
Browse files Browse the repository at this point in the history
…o Daft-SQL (#3701)
  • Loading branch information
rchowell authored Jan 17, 2025
1 parent 412cef4 commit ecc2970
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 48 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@ log/

# helix editor
.helix

# uv
uv.lock
12 changes: 9 additions & 3 deletions daft/io/_iceberg.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# isort: dont-add-import: from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Dict, Optional, Union

from daft import context
from daft.api_annotations import PublicAPI
Expand Down Expand Up @@ -53,7 +53,7 @@ def get_first_property_value(*property_names: str) -> Optional[Any]:

@PublicAPI
def read_iceberg(
table: "pyiceberg.table.Table",
table: Union[str, "pyiceberg.table.Table"],
snapshot_id: Optional[int] = None,
io_config: Optional["IOConfig"] = None,
) -> DataFrame:
Expand All @@ -75,15 +75,21 @@ def read_iceberg(
official project for Python.
Args:
table (pyiceberg.table.Table): `PyIceberg Table <https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.Table>`__ created using the PyIceberg library
table (str or pyiceberg.table.Table): `PyIceberg Table <https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.Table>`__ created using the PyIceberg library
snapshot_id (int, optional): Snapshot ID of the table to query
io_config (IOConfig, optional): A custom IOConfig to use when accessing Iceberg object storage data. If provided, configurations set in `table` are ignored.
Returns:
DataFrame: a DataFrame with the schema converted from the specified Iceberg table
"""
import pyiceberg

from daft.iceberg.iceberg_scan import IcebergScanOperator

# support for read_iceberg('path/to/metadata.json')
if isinstance(table, str):
table = pyiceberg.table.StaticTable.from_metadata(metadata_location=table)

io_config = (
_convert_iceberg_file_io_properties_to_io_config(table.io.properties) if io_config is None else io_config
)
Expand Down
43 changes: 43 additions & 0 deletions src/daft-scan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,3 +399,46 @@ pub fn delta_scan<T: IntoGlobPath>(
) -> DaftResult<LogicalPlanBuilder> {
panic!("Delta Lake scan requires the 'python' feature to be enabled.")
}

/// Creates a logical scan operator from a Python IcebergScanOperator.
/// ex:
/// ```python
/// iceberg_table = pyiceberg.table.StaticTable.from_metadata(metadata_location)
/// iceberg_scan = daft.iceberg.iceberg_scan.IcebergScanOperator(iceberg_table, snapshot_id, storage_config)
/// ```
#[cfg(feature = "python")]
pub fn iceberg_scan<T: AsRef<str>>(
metadata_location: T,
snapshot_id: Option<usize>,
io_config: Option<IOConfig>,
) -> DaftResult<LogicalPlanBuilder> {
use pyo3::IntoPyObjectExt;
let storage_config: StorageConfig = io_config.unwrap_or_default().into();
let scan_operator = Python::with_gil(|py| -> DaftResult<ScanOperatorHandle> {
// iceberg_table = pyiceberg.table.StaticTable.from_metadata(metadata_location)
let iceberg_table_module = PyModule::import(py, "pyiceberg.table")?;
let iceberg_static_table = iceberg_table_module.getattr("StaticTable")?;
let iceberg_table =
iceberg_static_table.call_method1("from_metadata", (metadata_location.as_ref(),))?;
// iceberg_scan = daft.iceberg.iceberg_scan.IcebergScanOperator(iceberg_table, snapshot_id, storage_config)
let iceberg_scan_module = PyModule::import(py, "daft.iceberg.iceberg_scan")?;
let iceberg_scan_class = iceberg_scan_module.getattr("IcebergScanOperator")?;
let iceberg_scan = iceberg_scan_class
.call1((iceberg_table, snapshot_id, storage_config))?
.into_py_any(py)?;
Ok(ScanOperatorHandle::from_python_scan_operator(
iceberg_scan,
py,
)?)
})?;
LogicalPlanBuilder::table_scan(scan_operator.into(), None)
}

#[cfg(not(feature = "python"))]
pub fn iceberg_scan<T: AsRef<str>>(
uri: T,
snapshot_id: Option<usize>,
io_config: Option<IOConfig>,
) -> DaftResult<LogicalPlanBuilder> {
panic!("Iceberg scan requires the 'python' feature to be enabled.")
}
6 changes: 6 additions & 0 deletions src/daft-scan/src/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ impl Default for StorageConfig {
}
}

impl From<IOConfig> for StorageConfig {
fn from(io_config: IOConfig) -> Self {
Self::new_internal(true, Some(io_config))
}
}

#[cfg(feature = "python")]
#[pymethods]
impl StorageConfig {
Expand Down
1 change: 1 addition & 0 deletions src/daft-sql/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub trait SQLFunction: Send + Sync {
.collect::<SQLPlannerResult<Vec<_>>>()
}

// nit cleanup: argument consistency with SQLTableFunction
fn to_expr(&self, inputs: &[FunctionArg], planner: &SQLPlanner) -> SQLPlannerResult<ExprRef>;

/// Produce the docstrings for this SQL function, parametrized by an alias which is the function name to invoke this in SQL
Expand Down
56 changes: 11 additions & 45 deletions src/daft-sql/src/table_provider/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
pub mod read_csv;
pub mod read_json;
pub mod read_parquet;
mod read_csv;
mod read_deltalake;
mod read_iceberg;
mod read_json;
mod read_parquet;

use std::{collections::HashMap, sync::Arc};

use daft_logical_plan::LogicalPlanBuilder;
use once_cell::sync::Lazy;
use read_csv::ReadCsvFunction;
use read_deltalake::ReadDeltalakeFunction;
use read_iceberg::SqlReadIceberg;
use read_json::ReadJsonFunction;
use read_parquet::ReadParquetFunction;
use sqlparser::ast::TableFunctionArgs;
Expand All @@ -20,10 +25,10 @@ use crate::{
pub(crate) static SQL_TABLE_FUNCTIONS: Lazy<SQLTableFunctions> = Lazy::new(|| {
let mut functions = SQLTableFunctions::new();
functions.add_fn("read_csv", ReadCsvFunction);
functions.add_fn("read_deltalake", ReadDeltalakeFunction);
functions.add_fn("read_iceberg", SqlReadIceberg);
functions.add_fn("read_json", ReadJsonFunction);
functions.add_fn("read_parquet", ReadParquetFunction);
#[cfg(feature = "python")]
functions.add_fn("read_deltalake", ReadDeltalakeFunction);
functions
});

Expand Down Expand Up @@ -70,50 +75,11 @@ impl<'a> SQLPlanner<'a> {
}
}

// nit cleanup: switch param order and rename to `to_logical_plan` for consistency with SQLFunction.
pub(crate) trait SQLTableFunction: Send + Sync {
fn plan(
&self,
planner: &SQLPlanner,
args: &TableFunctionArgs,
) -> SQLPlannerResult<LogicalPlanBuilder>;
}

pub struct ReadDeltalakeFunction;

#[cfg(feature = "python")]
impl SQLTableFunction for ReadDeltalakeFunction {
fn plan(
&self,
planner: &SQLPlanner,
args: &TableFunctionArgs,
) -> SQLPlannerResult<LogicalPlanBuilder> {
let (uri, io_config) = match args.args.as_slice() {
[uri] => (uri, None),
[uri, io_config] => {
let args = planner.parse_function_args(&[io_config.clone()], &["io_config"], 0)?;
let io_config = args.get_named("io_config").map(expr_to_iocfg).transpose()?;

(uri, io_config)
}
_ => unsupported_sql_err!("Expected one or two arguments"),
};
let uri = planner.plan_function_arg(uri)?;

let Some(uri) = uri.as_literal().and_then(|lit| lit.as_str()) else {
unsupported_sql_err!("Expected a string literal for the first argument");
};

daft_scan::builder::delta_scan(uri, io_config, true).map_err(From::from)
}
}

#[cfg(not(feature = "python"))]
impl SQLTableFunction for ReadDeltalakeFunction {
fn plan(
&self,
planner: &SQLPlanner,
args: &TableFunctionArgs,
) -> SQLPlannerResult<LogicalPlanBuilder> {
unsupported_sql_err!("`read_deltalake` function is not supported. Enable the `python` feature to use this function.")
}
}
44 changes: 44 additions & 0 deletions src/daft-sql/src/table_provider/read_deltalake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use daft_logical_plan::LogicalPlanBuilder;
use sqlparser::ast::TableFunctionArgs;

use super::{expr_to_iocfg, SQLTableFunction};
use crate::{error::SQLPlannerResult, unsupported_sql_err, SQLPlanner};

pub(super) struct ReadDeltalakeFunction;

#[cfg(feature = "python")]
impl SQLTableFunction for ReadDeltalakeFunction {
fn plan(
&self,
planner: &SQLPlanner,
args: &TableFunctionArgs,
) -> SQLPlannerResult<LogicalPlanBuilder> {
let (uri, io_config) = match args.args.as_slice() {
[uri] => (uri, None),
[uri, io_config] => {
let args = planner.parse_function_args(&[io_config.clone()], &["io_config"], 0)?;
let io_config = args.get_named("io_config").map(expr_to_iocfg).transpose()?;
(uri, io_config)
}
_ => unsupported_sql_err!("Expected one or two arguments"),
};
let uri = planner.plan_function_arg(uri)?;

let Some(uri) = uri.as_literal().and_then(|lit| lit.as_str()) else {
unsupported_sql_err!("Expected a string literal for the first argument");
};

daft_scan::builder::delta_scan(uri, io_config, true).map_err(From::from)
}
}

#[cfg(not(feature = "python"))]
impl SQLTableFunction for ReadDeltalakeFunction {
fn plan(
&self,
planner: &SQLPlanner,
args: &TableFunctionArgs,
) -> SQLPlannerResult<LogicalPlanBuilder> {
unsupported_sql_err!("`read_deltalake` function is not supported. Enable the `python` feature to use this function.")
}
}
74 changes: 74 additions & 0 deletions src/daft-sql/src/table_provider/read_iceberg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use common_io_config::IOConfig;
use daft_logical_plan::LogicalPlanBuilder;
use sqlparser::ast::TableFunctionArgs;

use super::SQLTableFunction;
use crate::{
error::{PlannerError, SQLPlannerResult},
functions::{self, SQLFunctionArguments},
SQLPlanner,
};

/// The Daft-SQL `read_iceberg` table-value function.
pub(super) struct SqlReadIceberg;

/// The Daft-SQL `read_iceberg` table-value function arguments.
struct SqlReadIcebergArgs {
metadata_location: String,
snapshot_id: Option<usize>,
io_config: Option<IOConfig>,
}

impl SqlReadIcebergArgs {
/// Like a TryFrom<SQLFunctionArguments> but from TalbeFunctionArgs directly and passing the planner.
fn try_from(planner: &SQLPlanner, args: &TableFunctionArgs) -> SQLPlannerResult<Self> {
planner.plan_function_args(&args.args, &["snapshot_id", "io_config"], 1)
}
}

impl TryFrom<SQLFunctionArguments> for SqlReadIcebergArgs {
type Error = PlannerError;

/// This is required to use `planner.plan_function_args`
fn try_from(args: SQLFunctionArguments) -> Result<Self, Self::Error> {
let metadata_location: String = args
.try_get_positional(0)?
.expect("read_iceberg requires a path");
let snapshot_id: Option<usize> = args.try_get_named("snapshot_id")?;
let io_config: Option<IOConfig> = functions::args::parse_io_config(&args)?.into();
Ok(Self {
metadata_location,
snapshot_id,
io_config,
})
}
}

/// Translates the `read_iceberg` table-value function to a logical scan operator.
#[cfg(feature = "python")]
impl SQLTableFunction for SqlReadIceberg {
fn plan(
&self,
planner: &SQLPlanner,
args: &TableFunctionArgs,
) -> SQLPlannerResult<LogicalPlanBuilder> {
let args = SqlReadIcebergArgs::try_from(planner, args)?;
Ok(daft_scan::builder::iceberg_scan(
args.metadata_location,
args.snapshot_id,
args.io_config,
)?)
}
}

/// Translates the `read_iceberg` table-value function to a logical scan operator (errors without python feature).
#[cfg(not(feature = "python"))]
impl SQLTableFunction for SqlReadIceberg {
fn plan(
&self,
planner: &SQLPlanner,
args: &TableFunctionArgs,
) -> SQLPlannerResult<LogicalPlanBuilder> {
crate::unsupported_sql_err!("`read_iceberg` function is not supported. Enable the `python` feature to use this function.")
}
}
Empty file.
20 changes: 20 additions & 0 deletions tests/sql/test_table_functions/test_read_iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pytest

import daft


@pytest.mark.skip(
"invoke manually via `uv run tests/sql/test_table_functions/test_read_iceberg.py <metadata_location>`"
)
def test_read_iceberg(metadata_location):
df = daft.sql(f"SELECT * FROM read_iceberg('{metadata_location}')")
print(df.collect())


if __name__ == "__main__":
import sys

if len(sys.argv) < 2:
print("usage: test_read_iceberg.py <metadata_location>")
sys.exit(1)
test_read_iceberg(metadata_location=sys.argv[1])
File renamed without changes.

0 comments on commit ecc2970

Please sign in to comment.