Skip to content

Commit

Permalink
feat: align python table APIs with rust (#267)
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan authored Jan 27, 2025
1 parent 1d08d14 commit 88f9f03
Show file tree
Hide file tree
Showing 18 changed files with 386 additions and 74 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ jobs:
- name: Integration tests
run: |
cd demo
./run_app.sh
./run_demo.sh
publish-coverage:
name: Publish coverage reports to codecov.io
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/file_group/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl FileGroupReader {
.map_err(|e| ReadFileSliceError(format!("Failed to filter records: {e:?}")))
}

pub async fn read_file_slice(
pub(crate) async fn read_file_slice(
&self,
file_slice: &FileSlice,
base_file_only: bool,
Expand Down
88 changes: 65 additions & 23 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,27 @@ impl Table {
.await
}

pub fn hudi_options(&self) -> HashMap<String, String> {
self.hudi_configs.as_options()
}

pub fn storage_options(&self) -> HashMap<String, String> {
self.storage_options.as_ref().clone()
}

#[cfg(feature = "datafusion")]
pub fn register_storage(
&self,
runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
) {
self.timeline
.storage
.register_object_store(runtime_env.clone());
self.file_system_view
.storage
.register_object_store(runtime_env.clone());
}

pub fn base_url(&self) -> Url {
let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::BasePath);
self.hudi_configs
Expand All @@ -165,27 +186,6 @@ impl Table {
.to::<String>()
}

pub fn hudi_options(&self) -> HashMap<String, String> {
self.hudi_configs.as_options()
}

pub fn storage_options(&self) -> HashMap<String, String> {
self.storage_options.as_ref().clone()
}

#[cfg(feature = "datafusion")]
pub fn register_storage(
&self,
runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
) {
self.timeline
.storage
.register_object_store(runtime_env.clone());
self.file_system_view
.storage
.register_object_store(runtime_env.clone());
}

/// Get the latest [Schema] of the table.
pub async fn get_schema(&self) -> Result<Schema> {
self.timeline.get_latest_schema().await
Expand Down Expand Up @@ -222,7 +222,36 @@ impl Table {
n: usize,
filters: &[(&str, &str, &str)],
) -> Result<Vec<Vec<FileSlice>>> {
let file_slices = self.get_file_slices(filters).await?;
let filters = from_str_tuples(filters)?;
if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
self.get_file_slices_splits_internal(n, timestamp.to::<String>().as_str(), &filters)
.await
} else if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
self.get_file_slices_splits_internal(n, timestamp, &filters)
.await
} else {
Ok(Vec::new())
}
}

pub async fn get_file_slices_splits_as_of(
&self,
n: usize,
timestamp: &str,
filters: &[(&str, &str, &str)],
) -> Result<Vec<Vec<FileSlice>>> {
let filters = from_str_tuples(filters)?;
self.get_file_slices_splits_internal(n, timestamp, &filters)
.await
}

async fn get_file_slices_splits_internal(
&self,
n: usize,
timestamp: &str,
filters: &[Filter],
) -> Result<Vec<Vec<FileSlice>>> {
let file_slices = self.get_file_slices_internal(timestamp, filters).await?;
if file_slices.is_empty() {
return Ok(Vec::new());
}
Expand Down Expand Up @@ -282,7 +311,7 @@ impl Table {
)
}

pub fn create_file_group_reader_with_filters(
fn create_file_group_reader_with_filters(
&self,
filters: &[(&str, &str, &str)],
schema: &Schema,
Expand Down Expand Up @@ -797,6 +826,19 @@ mod tests {
assert_eq!(file_slices_splits[1].len(), 1);
}

#[tokio::test]
async fn hudi_table_get_file_slices_splits_as_of() {
let base_url = SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor();

let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices_splits = hudi_table
.get_file_slices_splits_as_of(2, "20250121000702475", &[])
.await
.unwrap();
assert_eq!(file_slices_splits.len(), 1);
assert_eq!(file_slices_splits[0].len(), 1);
}

#[tokio::test]
async fn hudi_table_get_file_slices_as_of_timestamps() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion demo/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,5 @@ services:
AWS_REGION: us-east-1 # minio default

networks:
app_network:
demo_network:
driver: bridge
8 changes: 4 additions & 4 deletions demo/run_app.sh → demo/run_demo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ if [ $attempt -eq $max_attempts ]; then
exit 1
fi

# install dependencies and run the app
# install dependencies and run the demo apps
docker compose exec -T runner /bin/bash -c "
cd /opt/hudi-rs/python && \
make setup develop && \
cd /opt/hudi-rs/demo/app && \
cargo run --manifest-path=rust/Cargo.toml && \
python -m python.src.main
cd /opt/hudi-rs/demo/sql-datafusion && ./run.sh &&\
cd /opt/hudi-rs/demo/table-api-python && ./run.sh && \
cd /opt/hudi-rs/demo/table-api-rust && ./run.sh
"
4 changes: 2 additions & 2 deletions demo/app/rust/Cargo.toml → demo/sql-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
# keep this empty such that it won't be linked to the repo workspace

[package]
name = "app"
name = "demo-sql-datafusion"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = "^1"
datafusion = "^43"
hudi = { path = "../../../crates/hudi", features = ["datafusion"] }
hudi = { path = "../../crates/hudi", features = ["datafusion"] }
21 changes: 21 additions & 0 deletions demo/sql-datafusion/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

cargo run
66 changes: 66 additions & 0 deletions demo/sql-datafusion/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

use std::sync::Arc;

use datafusion::error::Result;
use datafusion::prelude::{DataFrame, SessionContext};
use hudi::HudiDataSource;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
let hudi = HudiDataSource::new("s3://hudi-demo/cow/v6_complexkeygen_hivestyle").await?;
ctx.register_table("cow_v6_table", Arc::new(hudi))?;
let df: DataFrame = ctx.sql("SELECT * from cow_v6_table").await?;
assert_eq!(
df.schema()
.columns()
.iter()
.map(|c| c.name())
.collect::<Vec<_>>(),
vec![
"_hoodie_commit_time",
"_hoodie_commit_seqno",
"_hoodie_record_key",
"_hoodie_partition_path",
"_hoodie_file_name",
"id",
"name",
"isActive",
"intField",
"longField",
"floatField",
"doubleField",
"decimalField",
"dateField",
"timestampField",
"binaryField",
"arrayField",
"mapField",
"structField",
"byteField",
"shortField",
]
);
assert_eq!(df.count().await?, 4);

println!("SQL (DataFusion): read snapshot successfully!");
Ok(())
}
21 changes: 21 additions & 0 deletions demo/table-api-python/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

python -m src.main
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
# specific language governing permissions and limitations
# under the License.

from hudi import HudiTableBuilder
import pyarrow as pa

from hudi import HudiTableBuilder

for url in [
"s3://hudi-demo/cow/v6_complexkeygen_hivestyle",
"s3://hudi-demo/mor/v6_complexkeygen_hivestyle",
]:
hudi_table = HudiTableBuilder.from_base_uri(url).build()
records = hudi_table.read_snapshot()
batches = hudi_table.read_snapshot()

arrow_table = pa.Table.from_batches(records)
arrow_table = pa.Table.from_batches(batches)
assert arrow_table.schema.names == [
"_hoodie_commit_time",
"_hoodie_commit_seqno",
Expand All @@ -51,4 +52,4 @@
]
assert arrow_table.num_rows == 4

print("Python API: read snapshot successfully!")
print("Table API (Python): read snapshot successfully!")
30 changes: 30 additions & 0 deletions demo/table-api-rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[workspace]
# keep this empty such that it won't be linked to the repo workspace

[package]
name = "demo-table-api-rust"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = "^1"
arrow = { version = "= 53.3.0", features = ["pyarrow"] }

hudi = { path = "../../crates/hudi" }
21 changes: 21 additions & 0 deletions demo/table-api-rust/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

cargo run
Loading

0 comments on commit 88f9f03

Please sign in to comment.