Skip to content

Commit

Permalink
refactor config parsing functions
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Sep 10, 2024
1 parent f4a8ffd commit 42297eb
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 68 deletions.
21 changes: 8 additions & 13 deletions crates/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use async_recursion::async_recursion;
use bytes::Bytes;
use futures::StreamExt;
use object_store::path::Path as ObjPath;
use object_store::{parse_url, parse_url_opts, ObjectStore};
use object_store::{parse_url_opts, ObjectStore};
use parquet::arrow::async_reader::ParquetObjectReader;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::ParquetMetaData;
Expand Down Expand Up @@ -101,6 +101,13 @@ impl Storage {
Ok(bytes)
}

pub async fn get_file_data_from_absolute_path(&self, absolute_path: &str) -> Result<Bytes> {
let obj_path = ObjPath::from_absolute_path(PathBuf::from(absolute_path))?;
let result = self.object_store.get(&obj_path).await?;
let bytes = result.bytes().await?;
Ok(bytes)
}

pub async fn get_parquet_file_data(&self, relative_path: &str) -> Result<RecordBatch> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
Expand Down Expand Up @@ -201,18 +208,6 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> Result<Ve
Ok(leaf_dirs)
}

pub async fn get_file_data(location: &str) -> Result<Bytes> {
let url = Url::from_file_path(location).unwrap();
match parse_url(&url) {
Ok((object_store, _)) => {
let obj_path = ObjPath::from_url_path(url.path()).unwrap();
let result = object_store.get(&obj_path).await?;
Ok(result.bytes().await?)
}
Err(e) => Err(anyhow!("Failed to create storage: {}", e)),
}
}

#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
Expand Down
30 changes: 28 additions & 2 deletions crates/core/src/storage/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/

use std::collections::HashMap;
use std::io::{BufRead, BufReader, Cursor};
use std::path::{Path, PathBuf};
use std::str::FromStr;

use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use url::{ParseError, Url};

pub fn split_filename(filename: &str) -> Result<(String, String)> {
Expand Down Expand Up @@ -80,6 +82,30 @@ pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> {
std::iter::empty::<(&str, &str)>()
}

pub async fn parse_config_data(data: &Bytes, split_chars: &str) -> Result<HashMap<String, String>> {
let cursor = Cursor::new(data);
let lines = BufReader::new(cursor).lines();
let mut configs = HashMap::new();

for line in lines {
let line = line.context("Failed to read line")?;
let trimmed_line = line.trim();

Check warning on line 92 in crates/core/src/storage/utils.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/storage/utils.rs#L92

Added line #L92 was not covered by tests
if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
continue;
}
let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c));
let key = parts
.next()
.context("Missing key in config line")?
.trim()
.to_owned();
let value = parts.next().unwrap_or("").trim().to_owned();
configs.insert(key, value);
}

Ok(configs)
}

#[cfg(test)]
mod tests {
use std::str::FromStr;
Expand Down
108 changes: 55 additions & 53 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

use std::collections::HashMap;
use std::env;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::{anyhow, Context, Result};
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use bytes::Bytes;
use strum::IntoEnumIterator;
use url::Url;

Expand All @@ -42,8 +40,8 @@ use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
use crate::config::HUDI_CONF_DIR;
use crate::file_group::FileSlice;
use crate::storage::utils::{empty_options, parse_uri};
use crate::storage::{get_file_data, Storage};
use crate::storage::utils::{empty_options, parse_config_data, parse_uri};
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::timeline::Timeline;

Expand Down Expand Up @@ -96,42 +94,6 @@ impl Table {
})
}

async fn parse_config_file(
data: &Bytes,
split_chars: &str,
hudi_options: &mut HashMap<String, String>,
) {
let cursor = std::io::Cursor::new(data);
let lines = BufReader::new(cursor).lines();
for line in lines {
let line = line.unwrap();
let trimmed_line = line.trim();
if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
continue;
}
let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c));
let key = parts.next().unwrap().to_owned();
let value = parts.next().unwrap_or("").trim().to_owned();
hudi_options.insert(key, value);
}
}

async fn get_global_config_data() -> Option<Bytes> {
match env::var(HUDI_CONF_DIR) {
Ok(hudi_conf_dir) => {
let path_buf = PathBuf::new()
.join(hudi_conf_dir)
.join("hudi-defaults.conf");
let file_path = path_buf.to_str().unwrap();
match get_file_data(file_path).await {
Ok(bytes) => Some(bytes),
Err(_) => None,
}
}
Err(_) => None,
}
}

#[cfg(feature = "datafusion")]
pub fn register_storage(
&self,
Expand Down Expand Up @@ -167,27 +129,67 @@ impl Table {
}
}

let global_config_data = Self::get_global_config_data().await;
if let Some(global_config_data) = global_config_data {
Self::parse_config_file(&global_config_data, " \t=", &mut hudi_options).await;
}

let storage = Storage::new(base_url, &extra_options)?;
let hoodie_properties_data = storage.get_file_data(".hoodie/hoodie.properties").await?;
Self::parse_config_file(&hoodie_properties_data, "=", &mut hudi_options).await;

Self::imbue_table_properties(&mut hudi_options, storage.clone()).await?;

Self::imbue_global_hudi_configs(&mut hudi_options, storage.clone()).await?;

let hudi_configs = HudiConfigs::new(hudi_options);

Self::validate_configs(&hudi_configs).map(|_| (hudi_configs, extra_options))
}

fn imbue_cloud_env_vars(options: &mut HashMap<String, String>) {
let prefixes = ["AWS_", "AZURE_", "GOOGLE_"];
options.extend(
env::vars()
.filter(|(key, _)| prefixes.iter().any(|prefix| key.starts_with(prefix)))
.map(|(k, v)| (k.to_ascii_lowercase(), v)),
);
const PREFIXES: [&str; 3] = ["AWS_", "AZURE_", "GOOGLE_"];

for (key, value) in env::vars() {
if PREFIXES.iter().any(|prefix| key.starts_with(prefix))
&& !options.contains_key(&key.to_ascii_lowercase())

Check warning on line 148 in crates/core/src/table/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/mod.rs#L148

Added line #L148 was not covered by tests
{
options.insert(key.to_ascii_lowercase(), value);

Check warning on line 150 in crates/core/src/table/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/mod.rs#L150

Added line #L150 was not covered by tests
}
}
}

async fn imbue_table_properties(
options: &mut HashMap<String, String>,
storage: Arc<Storage>,
) -> Result<()> {
let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?;
let table_properties = parse_config_data(&bytes, "=").await?;

// TODO: handle the case where the same key is present in both table properties and options
for (k, v) in table_properties {
options.insert(k.to_string(), v.to_string());
}

Ok(())
}

async fn imbue_global_hudi_configs(
options: &mut HashMap<String, String>,
storage: Arc<Storage>,
) -> Result<()> {
let global_config_path = env::var(HUDI_CONF_DIR)
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf"))
.join("hudi-defaults.conf");

if let Ok(bytes) = storage
.get_file_data_from_absolute_path(global_config_path.to_str().unwrap())
.await
{
if let Ok(global_configs) = parse_config_data(&bytes, " \t=").await {
for (key, value) in global_configs {
if key.starts_with("hoodie.") && !options.contains_key(&key) {
options.insert(key.to_string(), value.to_string());
}
}
}
}

Ok(())
}

fn validate_configs(hudi_configs: &HudiConfigs) -> Result<()> {
Expand Down

0 comments on commit 42297eb

Please sign in to comment.