From 42297eb8f0b4c8c5dda6d3955207739b6f9454ee Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Tue, 10 Sep 2024 01:51:27 -0500 Subject: [PATCH] refactor config parsing functions --- crates/core/src/storage/mod.rs | 21 +++--- crates/core/src/storage/utils.rs | 30 ++++++++- crates/core/src/table/mod.rs | 108 ++++++++++++++++--------------- 3 files changed, 91 insertions(+), 68 deletions(-) diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index ffcf3048..5ddfde4a 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -28,7 +28,7 @@ use async_recursion::async_recursion; use bytes::Bytes; use futures::StreamExt; use object_store::path::Path as ObjPath; -use object_store::{parse_url, parse_url_opts, ObjectStore}; +use object_store::{parse_url_opts, ObjectStore}; use parquet::arrow::async_reader::ParquetObjectReader; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::file::metadata::ParquetMetaData; @@ -101,6 +101,13 @@ impl Storage { Ok(bytes) } + pub async fn get_file_data_from_absolute_path(&self, absolute_path: &str) -> Result { + let obj_path = ObjPath::from_absolute_path(PathBuf::from(absolute_path))?; + let result = self.object_store.get(&obj_path).await?; + let bytes = result.bytes().await?; + Ok(bytes) + } + pub async fn get_parquet_file_data(&self, relative_path: &str) -> Result { let obj_url = join_url_segments(&self.base_url, &[relative_path])?; let obj_path = ObjPath::from_url_path(obj_url.path())?; @@ -201,18 +208,6 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> Result Result { - let url = Url::from_file_path(location).unwrap(); - match parse_url(&url) { - Ok((object_store, _)) => { - let obj_path = ObjPath::from_url_path(url.path()).unwrap(); - let result = object_store.get(&obj_path).await?; - Ok(result.bytes().await?) - } - Err(e) => Err(anyhow!("Failed to create storage: {}", e)), - } -} - #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; diff --git a/crates/core/src/storage/utils.rs b/crates/core/src/storage/utils.rs index a38f8134..80c86c67 100644 --- a/crates/core/src/storage/utils.rs +++ b/crates/core/src/storage/utils.rs @@ -16,11 +16,13 @@ * specific language governing permissions and limitations * under the License. */ - +use std::collections::HashMap; +use std::io::{BufRead, BufReader, Cursor}; use std::path::{Path, PathBuf}; use std::str::FromStr; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; +use bytes::Bytes; use url::{ParseError, Url}; pub fn split_filename(filename: &str) -> Result<(String, String)> { @@ -80,6 +82,30 @@ pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> { std::iter::empty::<(&str, &str)>() } +pub async fn parse_config_data(data: &Bytes, split_chars: &str) -> Result> { + let cursor = Cursor::new(data); + let lines = BufReader::new(cursor).lines(); + let mut configs = HashMap::new(); + + for line in lines { + let line = line.context("Failed to read line")?; + let trimmed_line = line.trim(); + if trimmed_line.is_empty() || trimmed_line.starts_with('#') { + continue; + } + let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c)); + let key = parts + .next() + .context("Missing key in config line")? + .trim() + .to_owned(); + let value = parts.next().unwrap_or("").trim().to_owned(); + configs.insert(key, value); + } + + Ok(configs) +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 17e3dca1..82df7ed4 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -19,7 +19,6 @@ use std::collections::HashMap; use std::env; -use std::io::{BufRead, BufReader}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; @@ -27,7 +26,6 @@ use std::sync::Arc; use anyhow::{anyhow, Context, Result}; use arrow::record_batch::RecordBatch; use arrow_schema::Schema; -use bytes::Bytes; use strum::IntoEnumIterator; use url::Url; @@ -42,8 +40,8 @@ use crate::config::table::{HudiTableConfig, TableTypeValue}; use crate::config::HudiConfigs; use crate::config::HUDI_CONF_DIR; use crate::file_group::FileSlice; -use crate::storage::utils::{empty_options, parse_uri}; -use crate::storage::{get_file_data, Storage}; +use crate::storage::utils::{empty_options, parse_config_data, parse_uri}; +use crate::storage::Storage; use crate::table::fs_view::FileSystemView; use crate::table::timeline::Timeline; @@ -96,42 +94,6 @@ impl Table { }) } - async fn parse_config_file( - data: &Bytes, - split_chars: &str, - hudi_options: &mut HashMap, - ) { - let cursor = std::io::Cursor::new(data); - let lines = BufReader::new(cursor).lines(); - for line in lines { - let line = line.unwrap(); - let trimmed_line = line.trim(); - if trimmed_line.is_empty() || trimmed_line.starts_with('#') { - continue; - } - let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c)); - let key = parts.next().unwrap().to_owned(); - let value = parts.next().unwrap_or("").trim().to_owned(); - hudi_options.insert(key, value); - } - } - - async fn get_global_config_data() -> Option { - match env::var(HUDI_CONF_DIR) { - Ok(hudi_conf_dir) => { - let path_buf = PathBuf::new() - .join(hudi_conf_dir) - .join("hudi-defaults.conf"); - let file_path = path_buf.to_str().unwrap(); - match get_file_data(file_path).await { - Ok(bytes) => Some(bytes), - Err(_) => None, - } - } - Err(_) => None, - } - } - #[cfg(feature = "datafusion")] pub fn register_storage( &self, @@ -167,14 +129,11 @@ impl Table { } } - let global_config_data = Self::get_global_config_data().await; - if let Some(global_config_data) = global_config_data { - Self::parse_config_file(&global_config_data, " \t=", &mut hudi_options).await; - } - let storage = Storage::new(base_url, &extra_options)?; - let hoodie_properties_data = storage.get_file_data(".hoodie/hoodie.properties").await?; - Self::parse_config_file(&hoodie_properties_data, "=", &mut hudi_options).await; + + Self::imbue_table_properties(&mut hudi_options, storage.clone()).await?; + + Self::imbue_global_hudi_configs(&mut hudi_options, storage.clone()).await?; let hudi_configs = HudiConfigs::new(hudi_options); @@ -182,12 +141,55 @@ impl Table { } fn imbue_cloud_env_vars(options: &mut HashMap) { - let prefixes = ["AWS_", "AZURE_", "GOOGLE_"]; - options.extend( - env::vars() - .filter(|(key, _)| prefixes.iter().any(|prefix| key.starts_with(prefix))) - .map(|(k, v)| (k.to_ascii_lowercase(), v)), - ); + const PREFIXES: [&str; 3] = ["AWS_", "AZURE_", "GOOGLE_"]; + + for (key, value) in env::vars() { + if PREFIXES.iter().any(|prefix| key.starts_with(prefix)) + && !options.contains_key(&key.to_ascii_lowercase()) + { + options.insert(key.to_ascii_lowercase(), value); + } + } + } + + async fn imbue_table_properties( + options: &mut HashMap, + storage: Arc, + ) -> Result<()> { + let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?; + let table_properties = parse_config_data(&bytes, "=").await?; + + // TODO: handle the case where the same key is present in both table properties and options + for (k, v) in table_properties { + options.insert(k.to_string(), v.to_string()); + } + + Ok(()) + } + + async fn imbue_global_hudi_configs( + options: &mut HashMap, + storage: Arc, + ) -> Result<()> { + let global_config_path = env::var(HUDI_CONF_DIR) + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf")) + .join("hudi-defaults.conf"); + + if let Ok(bytes) = storage + .get_file_data_from_absolute_path(global_config_path.to_str().unwrap()) + .await + { + if let Ok(global_configs) = parse_config_data(&bytes, " \t=").await { + for (key, value) in global_configs { + if key.starts_with("hoodie.") && !options.contains_key(&key) { + options.insert(key.to_string(), value.to_string()); + } + } + } + } + + Ok(()) } fn validate_configs(hudi_configs: &HudiConfigs) -> Result<()> {