Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove use of Filter from public APIs #266

Merged
merged 2 commits into from
Jan 26, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions crates/core/src/expr/filter.rs
Original file line number Diff line number Diff line change
@@ -42,6 +42,16 @@ impl Filter {
}
}

impl From<Filter> for (String, String, String) {
fn from(filter: Filter) -> Self {
(
filter.field_name,
filter.operator.to_string(),
filter.field_value,
)
}
}

impl TryFrom<(&str, &str, &str)> for Filter {
type Error = CoreError;

@@ -62,6 +72,10 @@ impl TryFrom<(&str, &str, &str)> for Filter {
}
}

pub fn from_str_tuples(tuples: &[(&str, &str, &str)]) -> Result<Vec<Filter>> {
tuples.iter().map(|t| Filter::try_from(*t)).collect()
}

pub struct FilterField {
pub name: String,
}
4 changes: 2 additions & 2 deletions crates/core/src/file_group/reader.rs
Original file line number Diff line number Diff line change
@@ -44,15 +44,15 @@ pub struct FileGroupReader {
}

impl FileGroupReader {
pub fn new(hudi_configs: Arc<HudiConfigs>, storage: Arc<Storage>) -> Self {
pub(crate) fn new(hudi_configs: Arc<HudiConfigs>, storage: Arc<Storage>) -> Self {
Self {
storage,
hudi_configs,
and_filters: Vec::new(),
}
}

pub fn new_with_filters(
pub(crate) fn new_with_filters(
storage: Arc<Storage>,
hudi_configs: Arc<HudiConfigs>,
and_filters: &[Filter],
138 changes: 80 additions & 58 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
@@ -94,7 +94,7 @@
use crate::config::table::HudiTableConfig::PartitionFields;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
use crate::expr::filter::{Filter, FilterField};
use crate::expr::filter::{from_str_tuples, Filter};
use crate::file_group::file_slice::FileSlice;
use crate::file_group::reader::FileGroupReader;
use crate::table::builder::TableBuilder;
@@ -140,7 +140,6 @@
.await
}

#[inline]
pub fn base_url(&self) -> Url {
let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::BasePath);
self.hudi_configs
@@ -150,7 +149,6 @@
.expect(&err_msg)
}

#[inline]
pub fn table_type(&self) -> TableTypeValue {
let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::TableType);
let table_type = self
@@ -161,7 +159,6 @@
TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
}

#[inline]
pub fn timezone(&self) -> String {
self.hudi_configs
.get_or_default(HudiTableConfig::TimelineTimezone)
@@ -223,7 +220,7 @@
pub async fn get_file_slices_splits(
&self,
n: usize,
filters: &[Filter],
filters: &[(&str, &str, &str)],
) -> Result<Vec<Vec<FileSlice>>> {
let file_slices = self.get_file_slices(filters).await?;
if file_slices.is_empty() {
@@ -242,19 +239,29 @@
/// Get all the [FileSlice]s in the table.
///
/// If the [AsOfTimestamp] configuration is set, the file slices at the specified timestamp will be returned.
pub async fn get_file_slices(&self, filters: &[Filter]) -> Result<Vec<FileSlice>> {
pub async fn get_file_slices(&self, filters: &[(&str, &str, &str)]) -> Result<Vec<FileSlice>> {
let filters = from_str_tuples(filters)?;
if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
self.get_file_slices_as_of(timestamp.to::<String>().as_str(), filters)
self.get_file_slices_internal(timestamp.to::<String>().as_str(), &filters)

Check warning on line 245 in crates/core/src/table/mod.rs

Codecov / codecov/patch

crates/core/src/table/mod.rs#L245

Added line #L245 was not covered by tests
.await
} else if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
self.get_file_slices_as_of(timestamp, filters).await
self.get_file_slices_internal(timestamp, &filters).await
} else {
Ok(Vec::new())
}
}

/// Get all the [FileSlice]s at a given timestamp, as a time travel query.
async fn get_file_slices_as_of(
pub async fn get_file_slices_as_of(
&self,
timestamp: &str,
filters: &[(&str, &str, &str)],
) -> Result<Vec<FileSlice>> {
let filters = from_str_tuples(filters)?;
self.get_file_slices_internal(timestamp, &filters).await
}

async fn get_file_slices_internal(
&self,
timestamp: &str,
filters: &[Filter],
@@ -277,49 +284,65 @@

pub fn create_file_group_reader_with_filters(
&self,
filters: &[Filter],
filters: &[(&str, &str, &str)],
schema: &Schema,
) -> Result<FileGroupReader> {
let filters = from_str_tuples(filters)?;
FileGroupReader::new_with_filters(
self.file_system_view.storage.clone(),
self.hudi_configs.clone(),
filters,
&filters,
schema,
)
}

/// Get all the latest records in the table.
///
/// If the [AsOfTimestamp] configuration is set, the records at the specified timestamp will be returned.
pub async fn read_snapshot(&self, filters: &[Filter]) -> Result<Vec<RecordBatch>> {
pub async fn read_snapshot(&self, filters: &[(&str, &str, &str)]) -> Result<Vec<RecordBatch>> {
let filters = from_str_tuples(filters)?;
let read_optimized_mode = self
.hudi_configs
.get_or_default(UseReadOptimizedMode)
.to::<bool>();

if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
self.read_snapshot_as_of(
self.read_snapshot_internal(

Check warning on line 310 in crates/core/src/table/mod.rs

Codecov / codecov/patch

crates/core/src/table/mod.rs#L310

Added line #L310 was not covered by tests
timestamp.to::<String>().as_str(),
filters,
&filters,

Check warning on line 312 in crates/core/src/table/mod.rs

Codecov / codecov/patch

crates/core/src/table/mod.rs#L312

Added line #L312 was not covered by tests
read_optimized_mode,
)
.await
} else if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
self.read_snapshot_as_of(timestamp, filters, read_optimized_mode)
self.read_snapshot_internal(timestamp, &filters, read_optimized_mode)

Check warning on line 317 in crates/core/src/table/mod.rs

Codecov / codecov/patch

crates/core/src/table/mod.rs#L317

Added line #L317 was not covered by tests
.await
} else {
Ok(Vec::new())
}
}

/// Get all the records in the table at a given timestamp, as a time travel query.
async fn read_snapshot_as_of(
pub async fn read_snapshot_as_of(
&self,
timestamp: &str,
filters: &[(&str, &str, &str)],
) -> Result<Vec<RecordBatch>> {
let filters = from_str_tuples(filters)?;
let read_optimized_mode = self
.hudi_configs
.get_or_default(UseReadOptimizedMode)

Check warning on line 333 in crates/core/src/table/mod.rs

Codecov / codecov/patch

crates/core/src/table/mod.rs#L331-L333

Added lines #L331 - L333 were not covered by tests
.to::<bool>();
self.read_snapshot_internal(timestamp, &filters, read_optimized_mode)

Check warning on line 335 in crates/core/src/table/mod.rs

Codecov / codecov/patch

crates/core/src/table/mod.rs#L335

Added line #L335 was not covered by tests
.await
}

async fn read_snapshot_internal(
&self,
timestamp: &str,
filters: &[Filter],
read_optimized_mode: bool,
) -> Result<Vec<RecordBatch>> {
let file_slices = self.get_file_slices_as_of(timestamp, filters).await?;
let file_slices = self.get_file_slices_internal(timestamp, filters).await?;
let fg_reader = self.create_file_group_reader();
let base_file_only =
read_optimized_mode || self.table_type() == TableTypeValue::CopyOnWrite;
@@ -334,7 +357,9 @@
Ok(batches)
}

/// Get records that were inserted or updated between the given timestamps. Records that were updated multiple times should have their latest states within the time span being returned.
/// Get records that were inserted or updated between the given timestamps.
/// Records that were updated multiple times should have their latest states within
/// the time span being returned.
pub async fn read_incremental_records(
&self,
start_timestamp: &str,
@@ -361,11 +386,11 @@

// Read incremental records from the file slices.
let filters = &[
FilterField::new(MetaField::CommitTime.as_ref()).gt(start_timestamp),
FilterField::new(MetaField::CommitTime.as_ref()).lte(end_timestamp),
(MetaField::CommitTime.as_ref(), ">", start_timestamp),
(MetaField::CommitTime.as_ref(), "<=", end_timestamp),
];
let fg_reader =
self.create_file_group_reader_with_filters(filters, MetaField::schema().as_ref())?;
let schema = MetaField::schema();
let fg_reader = self.create_file_group_reader_with_filters(filters, &schema)?;

// Read-optimized mode does not apply to incremental query semantics.
let base_file_only = self.table_type() == TableTypeValue::CopyOnWrite;
@@ -381,7 +406,9 @@
Ok(batches)
}

/// Get the change-data-capture (CDC) records between the given timestamps. The CDC records should reflect the records that were inserted, updated, and deleted between the timestamps.
/// Get the change-data-capture (CDC) records between the given timestamps.
/// The CDC records should reflect the records that were inserted, updated, and deleted
/// between the timestamps.
#[allow(dead_code)]
async fn read_incremental_changes(
&self,
@@ -405,7 +432,6 @@
use crate::config::HUDI_CONF_DIR;
use crate::storage::util::join_url_segments;
use crate::storage::Storage;
use crate::table::Filter;
use hudi_test::SampleTable;
use std::collections::HashSet;
use std::fs::canonicalize;
@@ -431,7 +457,10 @@
}

/// Test helper to get relative file paths from the table with filters.
async fn get_file_paths_with_filters(table: &Table, filters: &[Filter]) -> Result<Vec<String>> {
async fn get_file_paths_with_filters(
table: &Table,
filters: &[(&str, &str, &str)],
) -> Result<Vec<String>> {
let mut file_paths = Vec::new();
let base_url = table.base_url();
for f in table.get_file_slices(filters).await? {
@@ -783,11 +812,11 @@
);

// as of the latest timestamp
let opts = [(AsOfTimestamp.as_ref(), "20240418173551906")];
let hudi_table = Table::new_with_options(base_url.path(), opts)
let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices = hudi_table
.get_file_slices_as_of("20240418173551906", &[])
.await
.unwrap();
let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
assert_eq!(
file_slices
.iter()
@@ -847,11 +876,8 @@
.collect::<HashSet<_>>();
assert_eq!(actual, expected);

let filter_ge_10 = Filter::try_from(("byteField", ">=", "10")).unwrap();

let filter_lt_30 = Filter::try_from(("byteField", "<", "30")).unwrap();

let actual = get_file_paths_with_filters(&hudi_table, &[filter_ge_10, filter_lt_30])
let filters = [("byteField", ">=", "10"), ("byteField", "<", "30")];
let actual = get_file_paths_with_filters(&hudi_table, &filters)
.await
.unwrap()
.into_iter()
@@ -865,8 +891,7 @@
.collect::<HashSet<_>>();
assert_eq!(actual, expected);

let filter_gt_30 = Filter::try_from(("byteField", ">", "30")).unwrap();
let actual = get_file_paths_with_filters(&hudi_table, &[filter_gt_30])
let actual = get_file_paths_with_filters(&hudi_table, &[("byteField", ">", "30")])
.await
.unwrap()
.into_iter()
@@ -897,27 +922,26 @@
.collect::<HashSet<_>>();
assert_eq!(actual, expected);

let filter_gte_10 = Filter::try_from(("byteField", ">=", "10")).unwrap();
let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap();
let filter_ne_100 = Filter::try_from(("shortField", "!=", "100")).unwrap();

let actual =
get_file_paths_with_filters(&hudi_table, &[filter_gte_10, filter_lt_20, filter_ne_100])
.await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
let filters = [
("byteField", ">=", "10"),
("byteField", "<", "20"),
("shortField", "!=", "100"),
];
let actual = get_file_paths_with_filters(&hudi_table, &filters)
.await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
let expected = [
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
]
.map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() })
.into_iter()
.collect::<HashSet<_>>();
assert_eq!(actual, expected);
let filter_lt_20 = Filter::try_from(("byteField", ">", "20")).unwrap();
let filter_eq_300 = Filter::try_from(("shortField", "=", "300")).unwrap();

let actual = get_file_paths_with_filters(&hudi_table, &[filter_lt_20, filter_eq_300])
let filters = [("byteField", ">=", "20"), ("shortField", "=", "300")];
let actual = get_file_paths_with_filters(&hudi_table, &filters)
.await
.unwrap()
.into_iter()
@@ -966,17 +990,17 @@
#[tokio::test]
async fn test_non_partitioned_read_optimized() -> Result<()> {
let base_url = SampleTable::V6Nonpartitioned.url_to_mor();
let hudi_table = Table::new(base_url.path()).await?;
let hudi_table =
Table::new_with_options(base_url.path(), [(UseReadOptimizedMode.as_ref(), "true")])
.await?;
let commit_timestamps = hudi_table
.timeline
.completed_commits
.iter()
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let latest_commit = commit_timestamps.last().unwrap();
let records = hudi_table
.read_snapshot_as_of(latest_commit, &[], true)
.await?;
let records = hudi_table.read_snapshot_as_of(latest_commit, &[]).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;

@@ -1019,9 +1043,9 @@
let hudi_table = Table::new(base_url.path()).await?;

let filters = &[
Filter::try_from(("byteField", ">=", "10"))?,
Filter::try_from(("byteField", "<", "20"))?,
Filter::try_from(("shortField", "!=", "100"))?,
("byteField", ">=", "10"),
("byteField", "<", "20"),
("shortField", "!=", "100"),
];
let records = hudi_table.read_snapshot(filters).await?;
let schema = &records[0].schema();
@@ -1044,9 +1068,7 @@
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let first_commit = commit_timestamps[0];
let records = hudi_table
.read_snapshot_as_of(first_commit, &[], false)
.await?;
let records = hudi_table.read_snapshot_as_of(first_commit, &[]).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;

30 changes: 14 additions & 16 deletions crates/core/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -18,19 +18,17 @@
*/
pub mod arrow;

pub fn convert_vec_to_slice(vec: &[(String, String, String)]) -> Vec<(&str, &str, &str)> {
vec.iter()
.map(|(a, b, c)| (a.as_str(), b.as_str(), c.as_str()))
.collect()
pub trait StrTupleRef {
fn as_strs(&self) -> Vec<(&str, &str, &str)>;
}

#[macro_export]
macro_rules! vec_to_slice {
($vec:expr) => {
&convert_vec_to_slice(&$vec)[..]
};
impl StrTupleRef for Vec<(String, String, String)> {
fn as_strs(&self) -> Vec<(&str, &str, &str)> {
self.iter()
.map(|(s1, s2, s3)| (s1.as_str(), s2.as_str(), s3.as_str()))
.collect()
}
}
pub use vec_to_slice;

#[cfg(test)]
mod tests {
@@ -50,11 +48,11 @@ mod tests {
String::from("baz"),
),
];

let expected_slice = vec![("date", "=", "2022-01-02"), ("foo", "bar", "baz")];

let result = vec_to_slice!(&vec_of_strings);

assert_eq!(result, expected_slice);
let binding = vec_of_strings.as_strs();
let str_slice = &binding[..];
assert_eq!(
str_slice,
[("date", "=", "2022-01-02"), ("foo", "bar", "baz")]
);
}
}
3 changes: 2 additions & 1 deletion crates/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@ use hudi_core::config::read::HudiReadConfig::InputPartitions;
use hudi_core::config::util::empty_options;
use hudi_core::storage::util::{get_scheme_authority, join_url_segments};
use hudi_core::table::Table as HudiTable;
use hudi_core::util::StrTupleRef;

/// Create a `HudiDataSource`.
/// Used for Datafusion to query Hudi tables
@@ -178,7 +179,7 @@ impl TableProvider for HudiDataSource {
let pushdown_filters = exprs_to_filters(filters);
let file_slices = self
.table
.get_file_slices_splits(self.get_input_partitions(), pushdown_filters.as_slice())
.get_file_slices_splits(self.get_input_partitions(), &pushdown_filters.as_strs())
.await
.map_err(|e| Execution(format!("Failed to get file slices from Hudi table: {}", e)))?;
let base_url = self.table.base_url();
50 changes: 34 additions & 16 deletions crates/datafusion/src/util/expr.rs
Original file line number Diff line number Diff line change
@@ -34,14 +34,15 @@ use hudi_core::expr::filter::{col, Filter as HudiFilter};
/// otherwise returns `None`.
///
/// TODO: Handle other DataFusion [`Expr`]
pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<HudiFilter> {
pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<(String, String, String)> {
exprs
.iter()
.filter_map(|expr| match expr {
Expr::BinaryExpr(binary_expr) => binary_expr_to_filter(binary_expr),
Expr::Not(not_expr) => not_expr_to_filter(not_expr),
_ => None,
})
.map(|filter| filter.into())
.collect()
}

@@ -111,10 +112,14 @@ mod tests {
operator: ExprOperator::Eq,
field_value: "42".to_string(),
};

assert_eq!(result[0].field_name, expected_filter.field_name);
assert_eq!(result[0].operator, expected_filter.operator);
assert_eq!(*result[0].field_value.clone(), expected_filter.field_value);
assert_eq!(
result[0],
(
expected_filter.field_name,
expected_filter.operator.to_string(),
expected_filter.field_value
)
);
}

#[test]
@@ -139,10 +144,14 @@ mod tests {
operator: ExprOperator::Ne,
field_value: "42".to_string(),
};

assert_eq!(result[0].field_name, expected_filter.field_name);
assert_eq!(result[0].operator, expected_filter.operator);
assert_eq!(*result[0].field_value.clone(), expected_filter.field_value);
assert_eq!(
result[0],
(
expected_filter.field_name,
expected_filter.operator.to_string(),
expected_filter.field_value
)
);
}

#[test]
@@ -193,9 +202,14 @@ mod tests {
assert_eq!(result.len(), expected_filters.len());

for (result, expected_filter) in result.iter().zip(expected_filters.iter()) {
assert_eq!(result.field_name, expected_filter.field_name);
assert_eq!(result.operator, expected_filter.operator);
assert_eq!(*result.field_value.clone(), expected_filter.field_value);
assert_eq!(
result,
&(
expected_filter.field_name.clone(),
expected_filter.operator.to_string(),
expected_filter.field_value.clone()
)
);
}
}

@@ -229,10 +243,14 @@ mod tests {
operator: expected_op,
field_value: String::from("42"),
};

assert_eq!(result[0].field_name, expected_filter.field_name);
assert_eq!(result[0].operator, expected_filter.operator);
assert_eq!(*result[0].field_value.clone(), expected_filter.field_value);
assert_eq!(
result[0],
(
expected_filter.field_name,
expected_filter.operator.to_string(),
expected_filter.field_value
)
);
}
}

31 changes: 8 additions & 23 deletions python/src/internal.rs
Original file line number Diff line number Diff line change
@@ -26,13 +26,13 @@ use arrow::pyarrow::ToPyArrow;
use tokio::runtime::Runtime;

use hudi::error::CoreError;
use hudi::expr::filter::Filter;
use hudi::file_group::file_slice::FileSlice;
use hudi::file_group::reader::FileGroupReader;
use hudi::storage::error::StorageError;
use hudi::table::builder::TableBuilder;
use hudi::table::Table;
use pyo3::exceptions::{PyException, PyValueError};
use hudi::util::StrTupleRef;
use pyo3::exceptions::PyException;
use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python};

create_exception!(_internal, HudiCoreError, PyException);
@@ -195,11 +195,11 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<Vec<HudiFileSlice>>> {
let filters = convert_filters(filters)?;
let filters = filters.unwrap_or_default();

py.allow_threads(|| {
let file_slices = rt()
.block_on(self.inner.get_file_slices_splits(n, &filters))
.block_on(self.inner.get_file_slices_splits(n, &filters.as_strs()))
.map_err(PythonError::from)?;
Ok(file_slices
.iter()
@@ -214,11 +214,11 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<HudiFileSlice>> {
let filters = convert_filters(filters)?;
let filters = filters.unwrap_or_default();

py.allow_threads(|| {
let file_slices = rt()
.block_on(self.inner.get_file_slices(&filters))
.block_on(self.inner.get_file_slices(&filters.as_strs()))
.map_err(PythonError::from)?;
Ok(file_slices.iter().map(convert_file_slice).collect())
})
@@ -235,9 +235,9 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<PyObject> {
let filters = convert_filters(filters)?;
let filters = filters.unwrap_or_default();

rt().block_on(self.inner.read_snapshot(&filters))
rt().block_on(self.inner.read_snapshot(&filters.as_strs()))
.map_err(PythonError::from)?
.to_pyarrow(py)
}
@@ -258,21 +258,6 @@ impl HudiTable {
}
}

fn convert_filters(filters: Option<Vec<(String, String, String)>>) -> PyResult<Vec<Filter>> {
filters
.unwrap_or_default()
.into_iter()
.map(|(field, op, value)| {
Filter::try_from((field.as_str(), op.as_str(), value.as_str())).map_err(|e| {
PyValueError::new_err(format!(
"Invalid filter ({}, {}, {}): {}",
field, op, value, e
))
})
})
.collect()
}

#[cfg(not(tarpaulin))]
#[pyfunction]
#[pyo3(signature = (base_uri, hudi_options=None, storage_options=None, options=None))]