diff --git a/crates/core/src/expr/filter.rs b/crates/core/src/expr/filter.rs index 685aec47..847a2f57 100644 --- a/crates/core/src/expr/filter.rs +++ b/crates/core/src/expr/filter.rs @@ -42,6 +42,16 @@ impl Filter { } } +impl From for (String, String, String) { + fn from(filter: Filter) -> Self { + ( + filter.field_name, + filter.operator.to_string(), + filter.field_value, + ) + } +} + impl TryFrom<(&str, &str, &str)> for Filter { type Error = CoreError; @@ -62,6 +72,10 @@ impl TryFrom<(&str, &str, &str)> for Filter { } } +pub fn from_str_tuples(tuples: &[(&str, &str, &str)]) -> Result> { + tuples.iter().map(|t| Filter::try_from(*t)).collect() +} + pub struct FilterField { pub name: String, } diff --git a/crates/core/src/file_group/reader.rs b/crates/core/src/file_group/reader.rs index 6b875d2b..2a0610ec 100644 --- a/crates/core/src/file_group/reader.rs +++ b/crates/core/src/file_group/reader.rs @@ -44,7 +44,7 @@ pub struct FileGroupReader { } impl FileGroupReader { - pub fn new(hudi_configs: Arc, storage: Arc) -> Self { + pub(crate) fn new(hudi_configs: Arc, storage: Arc) -> Self { Self { storage, hudi_configs, @@ -52,7 +52,7 @@ impl FileGroupReader { } } - pub fn new_with_filters( + pub(crate) fn new_with_filters( storage: Arc, hudi_configs: Arc, and_filters: &[Filter], diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 5154a041..0a163b8c 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -94,7 +94,7 @@ use crate::config::read::HudiReadConfig::{AsOfTimestamp, UseReadOptimizedMode}; use crate::config::table::HudiTableConfig::PartitionFields; use crate::config::table::{HudiTableConfig, TableTypeValue}; use crate::config::HudiConfigs; -use crate::expr::filter::{Filter, FilterField}; +use crate::expr::filter::{from_str_tuples, Filter}; use crate::file_group::file_slice::FileSlice; use crate::file_group::reader::FileGroupReader; use crate::table::builder::TableBuilder; @@ -140,7 +140,6 @@ impl Table { .await } - #[inline] pub fn base_url(&self) -> Url { let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::BasePath); self.hudi_configs @@ -150,7 +149,6 @@ impl Table { .expect(&err_msg) } - #[inline] pub fn table_type(&self) -> TableTypeValue { let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::TableType); let table_type = self @@ -161,7 +159,6 @@ impl Table { TableTypeValue::from_str(table_type.as_str()).expect(&err_msg) } - #[inline] pub fn timezone(&self) -> String { self.hudi_configs .get_or_default(HudiTableConfig::TimelineTimezone) @@ -223,7 +220,7 @@ impl Table { pub async fn get_file_slices_splits( &self, n: usize, - filters: &[Filter], + filters: &[(&str, &str, &str)], ) -> Result>> { let file_slices = self.get_file_slices(filters).await?; if file_slices.is_empty() { @@ -242,19 +239,29 @@ impl Table { /// Get all the [FileSlice]s in the table. /// /// If the [AsOfTimestamp] configuration is set, the file slices at the specified timestamp will be returned. - pub async fn get_file_slices(&self, filters: &[Filter]) -> Result> { + pub async fn get_file_slices(&self, filters: &[(&str, &str, &str)]) -> Result> { + let filters = from_str_tuples(filters)?; if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) { - self.get_file_slices_as_of(timestamp.to::().as_str(), filters) + self.get_file_slices_internal(timestamp.to::().as_str(), &filters) .await } else if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() { - self.get_file_slices_as_of(timestamp, filters).await + self.get_file_slices_internal(timestamp, &filters).await } else { Ok(Vec::new()) } } /// Get all the [FileSlice]s at a given timestamp, as a time travel query. - async fn get_file_slices_as_of( + pub async fn get_file_slices_as_of( + &self, + timestamp: &str, + filters: &[(&str, &str, &str)], + ) -> Result> { + let filters = from_str_tuples(filters)?; + self.get_file_slices_internal(timestamp, &filters).await + } + + async fn get_file_slices_internal( &self, timestamp: &str, filters: &[Filter], @@ -277,13 +284,14 @@ impl Table { pub fn create_file_group_reader_with_filters( &self, - filters: &[Filter], + filters: &[(&str, &str, &str)], schema: &Schema, ) -> Result { + let filters = from_str_tuples(filters)?; FileGroupReader::new_with_filters( self.file_system_view.storage.clone(), self.hudi_configs.clone(), - filters, + &filters, schema, ) } @@ -291,21 +299,22 @@ impl Table { /// Get all the latest records in the table. /// /// If the [AsOfTimestamp] configuration is set, the records at the specified timestamp will be returned. - pub async fn read_snapshot(&self, filters: &[Filter]) -> Result> { + pub async fn read_snapshot(&self, filters: &[(&str, &str, &str)]) -> Result> { + let filters = from_str_tuples(filters)?; let read_optimized_mode = self .hudi_configs .get_or_default(UseReadOptimizedMode) .to::(); if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) { - self.read_snapshot_as_of( + self.read_snapshot_internal( timestamp.to::().as_str(), - filters, + &filters, read_optimized_mode, ) .await } else if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() { - self.read_snapshot_as_of(timestamp, filters, read_optimized_mode) + self.read_snapshot_internal(timestamp, &filters, read_optimized_mode) .await } else { Ok(Vec::new()) @@ -313,13 +322,27 @@ impl Table { } /// Get all the records in the table at a given timestamp, as a time travel query. - async fn read_snapshot_as_of( + pub async fn read_snapshot_as_of( + &self, + timestamp: &str, + filters: &[(&str, &str, &str)], + ) -> Result> { + let filters = from_str_tuples(filters)?; + let read_optimized_mode = self + .hudi_configs + .get_or_default(UseReadOptimizedMode) + .to::(); + self.read_snapshot_internal(timestamp, &filters, read_optimized_mode) + .await + } + + async fn read_snapshot_internal( &self, timestamp: &str, filters: &[Filter], read_optimized_mode: bool, ) -> Result> { - let file_slices = self.get_file_slices_as_of(timestamp, filters).await?; + let file_slices = self.get_file_slices_internal(timestamp, filters).await?; let fg_reader = self.create_file_group_reader(); let base_file_only = read_optimized_mode || self.table_type() == TableTypeValue::CopyOnWrite; @@ -334,7 +357,9 @@ impl Table { Ok(batches) } - /// Get records that were inserted or updated between the given timestamps. Records that were updated multiple times should have their latest states within the time span being returned. + /// Get records that were inserted or updated between the given timestamps. + /// Records that were updated multiple times should have their latest states within + /// the time span being returned. pub async fn read_incremental_records( &self, start_timestamp: &str, @@ -361,11 +386,11 @@ impl Table { // Read incremental records from the file slices. let filters = &[ - FilterField::new(MetaField::CommitTime.as_ref()).gt(start_timestamp), - FilterField::new(MetaField::CommitTime.as_ref()).lte(end_timestamp), + (MetaField::CommitTime.as_ref(), ">", start_timestamp), + (MetaField::CommitTime.as_ref(), "<=", end_timestamp), ]; - let fg_reader = - self.create_file_group_reader_with_filters(filters, MetaField::schema().as_ref())?; + let schema = MetaField::schema(); + let fg_reader = self.create_file_group_reader_with_filters(filters, &schema)?; // Read-optimized mode does not apply to incremental query semantics. let base_file_only = self.table_type() == TableTypeValue::CopyOnWrite; @@ -381,7 +406,9 @@ impl Table { Ok(batches) } - /// Get the change-data-capture (CDC) records between the given timestamps. The CDC records should reflect the records that were inserted, updated, and deleted between the timestamps. + /// Get the change-data-capture (CDC) records between the given timestamps. + /// The CDC records should reflect the records that were inserted, updated, and deleted + /// between the timestamps. #[allow(dead_code)] async fn read_incremental_changes( &self, @@ -405,7 +432,6 @@ mod tests { use crate::config::HUDI_CONF_DIR; use crate::storage::util::join_url_segments; use crate::storage::Storage; - use crate::table::Filter; use hudi_test::SampleTable; use std::collections::HashSet; use std::fs::canonicalize; @@ -431,7 +457,10 @@ mod tests { } /// Test helper to get relative file paths from the table with filters. - async fn get_file_paths_with_filters(table: &Table, filters: &[Filter]) -> Result> { + async fn get_file_paths_with_filters( + table: &Table, + filters: &[(&str, &str, &str)], + ) -> Result> { let mut file_paths = Vec::new(); let base_url = table.base_url(); for f in table.get_file_slices(filters).await? { @@ -783,11 +812,11 @@ mod tests { ); // as of the latest timestamp - let opts = [(AsOfTimestamp.as_ref(), "20240418173551906")]; - let hudi_table = Table::new_with_options(base_url.path(), opts) + let hudi_table = Table::new(base_url.path()).await.unwrap(); + let file_slices = hudi_table + .get_file_slices_as_of("20240418173551906", &[]) .await .unwrap(); - let file_slices = hudi_table.get_file_slices(&[]).await.unwrap(); assert_eq!( file_slices .iter() @@ -847,11 +876,8 @@ mod tests { .collect::>(); assert_eq!(actual, expected); - let filter_ge_10 = Filter::try_from(("byteField", ">=", "10")).unwrap(); - - let filter_lt_30 = Filter::try_from(("byteField", "<", "30")).unwrap(); - - let actual = get_file_paths_with_filters(&hudi_table, &[filter_ge_10, filter_lt_30]) + let filters = [("byteField", ">=", "10"), ("byteField", "<", "30")]; + let actual = get_file_paths_with_filters(&hudi_table, &filters) .await .unwrap() .into_iter() @@ -865,8 +891,7 @@ mod tests { .collect::>(); assert_eq!(actual, expected); - let filter_gt_30 = Filter::try_from(("byteField", ">", "30")).unwrap(); - let actual = get_file_paths_with_filters(&hudi_table, &[filter_gt_30]) + let actual = get_file_paths_with_filters(&hudi_table, &[("byteField", ">", "30")]) .await .unwrap() .into_iter() @@ -897,16 +922,16 @@ mod tests { .collect::>(); assert_eq!(actual, expected); - let filter_gte_10 = Filter::try_from(("byteField", ">=", "10")).unwrap(); - let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap(); - let filter_ne_100 = Filter::try_from(("shortField", "!=", "100")).unwrap(); - - let actual = - get_file_paths_with_filters(&hudi_table, &[filter_gte_10, filter_lt_20, filter_ne_100]) - .await - .unwrap() - .into_iter() - .collect::>(); + let filters = [ + ("byteField", ">=", "10"), + ("byteField", "<", "20"), + ("shortField", "!=", "100"), + ]; + let actual = get_file_paths_with_filters(&hudi_table, &filters) + .await + .unwrap() + .into_iter() + .collect::>(); let expected = [ "byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet", ] @@ -914,10 +939,9 @@ mod tests { .into_iter() .collect::>(); assert_eq!(actual, expected); - let filter_lt_20 = Filter::try_from(("byteField", ">", "20")).unwrap(); - let filter_eq_300 = Filter::try_from(("shortField", "=", "300")).unwrap(); - let actual = get_file_paths_with_filters(&hudi_table, &[filter_lt_20, filter_eq_300]) + let filters = [("byteField", ">=", "20"), ("shortField", "=", "300")]; + let actual = get_file_paths_with_filters(&hudi_table, &filters) .await .unwrap() .into_iter() @@ -966,7 +990,9 @@ mod tests { #[tokio::test] async fn test_non_partitioned_read_optimized() -> Result<()> { let base_url = SampleTable::V6Nonpartitioned.url_to_mor(); - let hudi_table = Table::new(base_url.path()).await?; + let hudi_table = + Table::new_with_options(base_url.path(), [(UseReadOptimizedMode.as_ref(), "true")]) + .await?; let commit_timestamps = hudi_table .timeline .completed_commits @@ -974,9 +1000,7 @@ mod tests { .map(|i| i.timestamp.as_str()) .collect::>(); let latest_commit = commit_timestamps.last().unwrap(); - let records = hudi_table - .read_snapshot_as_of(latest_commit, &[], true) - .await?; + let records = hudi_table.read_snapshot_as_of(latest_commit, &[]).await?; let schema = &records[0].schema(); let records = concat_batches(schema, &records)?; @@ -1019,9 +1043,9 @@ mod tests { let hudi_table = Table::new(base_url.path()).await?; let filters = &[ - Filter::try_from(("byteField", ">=", "10"))?, - Filter::try_from(("byteField", "<", "20"))?, - Filter::try_from(("shortField", "!=", "100"))?, + ("byteField", ">=", "10"), + ("byteField", "<", "20"), + ("shortField", "!=", "100"), ]; let records = hudi_table.read_snapshot(filters).await?; let schema = &records[0].schema(); @@ -1044,9 +1068,7 @@ mod tests { .map(|i| i.timestamp.as_str()) .collect::>(); let first_commit = commit_timestamps[0]; - let records = hudi_table - .read_snapshot_as_of(first_commit, &[], false) - .await?; + let records = hudi_table.read_snapshot_as_of(first_commit, &[]).await?; let schema = &records[0].schema(); let records = concat_batches(schema, &records)?; diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs index 84fa313e..21306c92 100644 --- a/crates/core/src/util/mod.rs +++ b/crates/core/src/util/mod.rs @@ -18,19 +18,17 @@ */ pub mod arrow; -pub fn convert_vec_to_slice(vec: &[(String, String, String)]) -> Vec<(&str, &str, &str)> { - vec.iter() - .map(|(a, b, c)| (a.as_str(), b.as_str(), c.as_str())) - .collect() +pub trait StrTupleRef { + fn as_strs(&self) -> Vec<(&str, &str, &str)>; } -#[macro_export] -macro_rules! vec_to_slice { - ($vec:expr) => { - &convert_vec_to_slice(&$vec)[..] - }; +impl StrTupleRef for Vec<(String, String, String)> { + fn as_strs(&self) -> Vec<(&str, &str, &str)> { + self.iter() + .map(|(s1, s2, s3)| (s1.as_str(), s2.as_str(), s3.as_str())) + .collect() + } } -pub use vec_to_slice; #[cfg(test)] mod tests { @@ -50,11 +48,11 @@ mod tests { String::from("baz"), ), ]; - - let expected_slice = vec![("date", "=", "2022-01-02"), ("foo", "bar", "baz")]; - - let result = vec_to_slice!(&vec_of_strings); - - assert_eq!(result, expected_slice); + let binding = vec_of_strings.as_strs(); + let str_slice = &binding[..]; + assert_eq!( + str_slice, + [("date", "=", "2022-01-02"), ("foo", "bar", "baz")] + ); } } diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index 07ab3f33..50a7a053 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -47,6 +47,7 @@ use hudi_core::config::read::HudiReadConfig::InputPartitions; use hudi_core::config::util::empty_options; use hudi_core::storage::util::{get_scheme_authority, join_url_segments}; use hudi_core::table::Table as HudiTable; +use hudi_core::util::StrTupleRef; /// Create a `HudiDataSource`. /// Used for Datafusion to query Hudi tables @@ -178,7 +179,7 @@ impl TableProvider for HudiDataSource { let pushdown_filters = exprs_to_filters(filters); let file_slices = self .table - .get_file_slices_splits(self.get_input_partitions(), pushdown_filters.as_slice()) + .get_file_slices_splits(self.get_input_partitions(), &pushdown_filters.as_strs()) .await .map_err(|e| Execution(format!("Failed to get file slices from Hudi table: {}", e)))?; let base_url = self.table.base_url(); diff --git a/crates/datafusion/src/util/expr.rs b/crates/datafusion/src/util/expr.rs index 3a8c94b0..7eb0eed6 100644 --- a/crates/datafusion/src/util/expr.rs +++ b/crates/datafusion/src/util/expr.rs @@ -34,7 +34,7 @@ use hudi_core::expr::filter::{col, Filter as HudiFilter}; /// otherwise returns `None`. /// /// TODO: Handle other DataFusion [`Expr`] -pub fn exprs_to_filters(exprs: &[Expr]) -> Vec { +pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<(String, String, String)> { exprs .iter() .filter_map(|expr| match expr { @@ -42,6 +42,7 @@ pub fn exprs_to_filters(exprs: &[Expr]) -> Vec { Expr::Not(not_expr) => not_expr_to_filter(not_expr), _ => None, }) + .map(|filter| filter.into()) .collect() } @@ -111,10 +112,14 @@ mod tests { operator: ExprOperator::Eq, field_value: "42".to_string(), }; - - assert_eq!(result[0].field_name, expected_filter.field_name); - assert_eq!(result[0].operator, expected_filter.operator); - assert_eq!(*result[0].field_value.clone(), expected_filter.field_value); + assert_eq!( + result[0], + ( + expected_filter.field_name, + expected_filter.operator.to_string(), + expected_filter.field_value + ) + ); } #[test] @@ -139,10 +144,14 @@ mod tests { operator: ExprOperator::Ne, field_value: "42".to_string(), }; - - assert_eq!(result[0].field_name, expected_filter.field_name); - assert_eq!(result[0].operator, expected_filter.operator); - assert_eq!(*result[0].field_value.clone(), expected_filter.field_value); + assert_eq!( + result[0], + ( + expected_filter.field_name, + expected_filter.operator.to_string(), + expected_filter.field_value + ) + ); } #[test] @@ -193,9 +202,14 @@ mod tests { assert_eq!(result.len(), expected_filters.len()); for (result, expected_filter) in result.iter().zip(expected_filters.iter()) { - assert_eq!(result.field_name, expected_filter.field_name); - assert_eq!(result.operator, expected_filter.operator); - assert_eq!(*result.field_value.clone(), expected_filter.field_value); + assert_eq!( + result, + &( + expected_filter.field_name.clone(), + expected_filter.operator.to_string(), + expected_filter.field_value.clone() + ) + ); } } @@ -229,10 +243,14 @@ mod tests { operator: expected_op, field_value: String::from("42"), }; - - assert_eq!(result[0].field_name, expected_filter.field_name); - assert_eq!(result[0].operator, expected_filter.operator); - assert_eq!(*result[0].field_value.clone(), expected_filter.field_value); + assert_eq!( + result[0], + ( + expected_filter.field_name, + expected_filter.operator.to_string(), + expected_filter.field_value + ) + ); } } diff --git a/python/src/internal.rs b/python/src/internal.rs index 9d300fd2..1bfb953c 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -26,13 +26,13 @@ use arrow::pyarrow::ToPyArrow; use tokio::runtime::Runtime; use hudi::error::CoreError; -use hudi::expr::filter::Filter; use hudi::file_group::file_slice::FileSlice; use hudi::file_group::reader::FileGroupReader; use hudi::storage::error::StorageError; use hudi::table::builder::TableBuilder; use hudi::table::Table; -use pyo3::exceptions::{PyException, PyValueError}; +use hudi::util::StrTupleRef; +use pyo3::exceptions::PyException; use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python}; create_exception!(_internal, HudiCoreError, PyException); @@ -195,11 +195,11 @@ impl HudiTable { filters: Option>, py: Python, ) -> PyResult>> { - let filters = convert_filters(filters)?; + let filters = filters.unwrap_or_default(); py.allow_threads(|| { let file_slices = rt() - .block_on(self.inner.get_file_slices_splits(n, &filters)) + .block_on(self.inner.get_file_slices_splits(n, &filters.as_strs())) .map_err(PythonError::from)?; Ok(file_slices .iter() @@ -214,11 +214,11 @@ impl HudiTable { filters: Option>, py: Python, ) -> PyResult> { - let filters = convert_filters(filters)?; + let filters = filters.unwrap_or_default(); py.allow_threads(|| { let file_slices = rt() - .block_on(self.inner.get_file_slices(&filters)) + .block_on(self.inner.get_file_slices(&filters.as_strs())) .map_err(PythonError::from)?; Ok(file_slices.iter().map(convert_file_slice).collect()) }) @@ -235,9 +235,9 @@ impl HudiTable { filters: Option>, py: Python, ) -> PyResult { - let filters = convert_filters(filters)?; + let filters = filters.unwrap_or_default(); - rt().block_on(self.inner.read_snapshot(&filters)) + rt().block_on(self.inner.read_snapshot(&filters.as_strs())) .map_err(PythonError::from)? .to_pyarrow(py) } @@ -258,21 +258,6 @@ impl HudiTable { } } -fn convert_filters(filters: Option>) -> PyResult> { - filters - .unwrap_or_default() - .into_iter() - .map(|(field, op, value)| { - Filter::try_from((field.as_str(), op.as_str(), value.as_str())).map_err(|e| { - PyValueError::new_err(format!( - "Invalid filter ({}, {}, {}): {}", - field, op, value, e - )) - }) - }) - .collect() -} - #[cfg(not(tarpaulin))] #[pyfunction] #[pyo3(signature = (base_uri, hudi_options=None, storage_options=None, options=None))]