Skip to content

Commit

Permalink
docs: add hudi core doc
Browse files Browse the repository at this point in the history
  • Loading branch information
KnightChess committed Aug 18, 2024
1 parent 3359e10 commit ea839a4
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 0 deletions.
18 changes: 18 additions & 0 deletions crates/core/src/config/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
//! Hudi internal configuration
use std::collections::HashMap;
use std::str::FromStr;
Expand All @@ -25,6 +26,23 @@ use strum_macros::EnumIter;

use crate::config::{ConfigParser, HudiConfigValue};

/// Hudi internal configuration
///
/// this conf will controll the read action when reading
///
/// **Example**
///
/// ```rust
/// use url::Url;
/// use hudi_core::config::HudiConfigValue;
/// use hudi_core::config::internal::HudiInternalConfig::SkipConfigValidation;
/// use hudi_core::table::Table as HudiTable;
///
/// let options = vec![(SkipConfigValidation.as_ref(), HudiConfigValue::Boolean(true))];
/// let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
/// HudiTable::new_with_options(base_uri.as_ref(), options);
/// ```
///
#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiInternalConfig {
SkipConfigValidation,
Expand Down
32 changes: 32 additions & 0 deletions crates/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
//! Hudi Configuration
use std::any::type_name;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -26,15 +27,24 @@ pub mod internal;
pub mod read;
pub mod table;

/// This trait defines the implementation approach for different categories of configurations in Hudi.
pub trait ConfigParser: AsRef<str> {
/// Configuration value type
type Output;

/// Initialize a [HudiConfigValue] with default value
fn default_value(&self) -> Option<Self::Output>;

/// Verify whether the configuration is required.
fn is_required(&self) -> bool {
false
}

/// Validate whether the current value is provided
///
/// if value is not provied
/// - [`Self::is_required`] is true, return [OK(())]
/// - [`Self::is_required`] is true, return [Err()]
fn validate(&self, configs: &HashMap<String, String>) -> Result<()> {
match self.parse_value(configs) {
Ok(_) => Ok(()),
Expand All @@ -49,8 +59,12 @@ pub trait ConfigParser: AsRef<str> {
}
}

/// Cover source configuration to [Output]
fn parse_value(&self, configs: &HashMap<String, String>) -> Result<Self::Output>;

/// Covert source configuration to [Output] with default value
///
/// if default value not define, panic!
fn parse_value_or_default(&self, configs: &HashMap<String, String>) -> Self::Output {
self.parse_value(configs).unwrap_or_else(|_| {
self.default_value()
Expand All @@ -59,6 +73,9 @@ pub trait ConfigParser: AsRef<str> {
}
}

/// Hudi Configuration value dataType
///
/// only support `Boolean` `Integer` `UInteger` `String` `List`
#[derive(Clone, Debug)]
pub enum HudiConfigValue {
Boolean(bool),
Expand All @@ -69,6 +86,13 @@ pub enum HudiConfigValue {
}

impl HudiConfigValue {
/// Covert [HudiConfigValue] logical type to real rust data type
///
/// - [`HudiConfigValue::Boolean`] -> [bool]
/// - [`HudiConfigValue::Integer`] -> [isize]
/// - [`HudiConfigValue::UInteger`] -> [usize]
/// - [`HudiConfigValue::String`] -> [String]
/// - [`HudiConfigValue::List`] -> [`Vec<String>`]
pub fn to<T: 'static + std::fmt::Debug + From<HudiConfigValue>>(self) -> T {
T::from(self)
}
Expand Down Expand Up @@ -122,18 +146,21 @@ impl From<HudiConfigValue> for Vec<String> {
}
}

/// Hudi raw configs storage
#[derive(Clone, Debug)]
pub struct HudiConfigs {
pub raw_configs: Arc<HashMap<String, String>>,
}

impl HudiConfigs {
/// Create [HudiConfigs] with [HashMap] configs
pub fn new(raw_configs: HashMap<String, String>) -> Self {
Self {
raw_configs: Arc::new(raw_configs),
}
}

/// Create empty [HudiConfigs]
pub fn empty() -> Self {
Self {
raw_configs: Arc::new(HashMap::new()),
Expand All @@ -151,13 +178,18 @@ impl HudiConfigs {
parser.parse_value(&self.raw_configs)
}

/// Will panic if value and default_value all not exist
pub fn get_or_default(
&self,
parser: impl ConfigParser<Output = HudiConfigValue>,
) -> HudiConfigValue {
parser.parse_value_or_default(&self.raw_configs)
}

/// Safe get value
///
/// - if value exist, will return [Some()]
/// - if value not exist, will return default_value, it depends on impl [ConfigParser::default_value]
pub fn try_get(
&self,
parser: impl ConfigParser<Output = HudiConfigValue>,
Expand Down
26 changes: 26 additions & 0 deletions crates/core/src/config/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
//! Hudi read configuration
use std::collections::HashMap;
use std::str::FromStr;
Expand All @@ -24,9 +25,34 @@ use crate::config::{ConfigParser, HudiConfigValue};
use anyhow::{anyhow, Result};
use strum_macros::EnumIter;

/// Hudi read configuration
///
/// this conf will controll the read action when reading
///
/// **Example**
///
/// ```rust
/// use url::Url;
/// use hudi_core::config::read::HudiReadConfig::{AsOfTimestamp, InputPartitions};
/// use hudi_core::table::Table as HudiTable;
///
/// let options = vec![(InputPartitions.as_ref(), "2"),
/// (AsOfTimestamp.as_ref(), "20240101010100000")];
/// let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
/// HudiTable::new_with_options(base_uri.as_ref(), options);
/// ```
///
#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiReadConfig {
/// Define input splits
/// - Hoodie Key : hoodie.read.input.partitions
///
/// If has 100 files, [InputPartitions] is 5, will product 5 chunk,
/// every iter or task process 20 files
InputPartitions,

/// The query instant for time travel. Without specified this option, we query the latest snapshot.
/// - Hoodie Key : hoodie.read.as.of.timestamp
AsOfTimestamp,
}

Expand Down
75 changes: 75 additions & 0 deletions crates/core/src/config/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
//! Hudi table configuration
use std::collections::HashMap;
use std::str::FromStr;
Expand All @@ -26,22 +27,92 @@ use strum_macros::{AsRefStr, EnumIter};

use crate::config::{ConfigParser, HudiConfigValue};

/// Hudi table configuration
///
/// **Example**
///
/// ```rust
/// use url::Url;
/// use hudi_core::config::table::HudiTableConfig::BaseFileFormat;
/// use hudi_core::table::Table as HudiTable;
///
/// let options = vec![(BaseFileFormat.as_ref(), "parquet")];
/// let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
/// HudiTable::new_with_options(base_uri.as_ref(), options);
/// ```
#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiTableConfig {
/// Base file format
/// - Hoodie Key : hoodie.table.base.file.format
///
/// support: parquet
BaseFileFormat,

/// Table checksum is used to guard against partial writes in HDFS.
/// It is added as the last entry in hoodie.properties and then used to validate while reading table config.
/// - Hoodie Key : hoodie.table.checksum
Checksum,

/// Database name that will be used for incremental query.
/// If different databases have the same table name during incremental query,
/// we can set it to limit the table name under a specific database
/// - Hoodie Key : hoodie.database.name
DatabaseName,

/// When set to true, will not write the partition columns into hudi. By default, false.
/// - Hoodie Key : hoodie.datasource.write.drop.partition.columns
DropsPartitionFields,

/// Flag to indicate whether to use Hive style partitioning.
/// If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.
/// By default false (the names of partition folders are only partition values)
/// - Hoodie Key : hoodie.datasource.write.hive_style_partitioning
IsHiveStylePartitioning,

/// Should we url encode the partition path value, before creating the folder structure.
/// - Hoodie Key : hoodie.datasource.write.partitionpath.urlencode
IsPartitionPathUrlencoded,

/// Key Generator class property for the hoodie table
/// - Hoodie Key : hoodie.table.keygenerator.class
KeyGeneratorClass,

/// Fields used to partition the table. Concatenated values of these fields are used as
/// the partition path, by invoking toString().
/// These fields also include the partition type which is used by custom key generators
/// - Hoodie Key : hoodie.table.partition.fields
PartitionFields,

/// Field used in preCombining before actual write. By default, when two records have the same key value,
/// the largest value for the precombine field determined by Object.compareTo(..), is picked.
/// - Hoodie Key : hoodie.table.precombine.field
PrecombineField,

/// When enabled, populates all meta fields. When disabled, no meta fields are populated
/// and incremental queries will not be functional. This is only meant to be used for append only/immutable data for batch processing
/// - Hoodie Key : hoodie.populate.meta.fields
PopulatesMetaFields,

/// Columns used to uniquely identify the table.
/// Concatenated values of these fields are used as the record key component of HoodieKey.
/// - Hoodie Key : hoodie.table.recordkey.fields
RecordKeyFields,

/// Table name that will be used for registering with Hive. Needs to be same across runs.
/// - Hoodie Key : hoodie.table.name
TableName,

/// The table type for the underlying data, for this write. This can’t change between writes.
/// - Hoodie Key : hoodie.table.type
TableType,

/// Version of table, used for running upgrade/downgrade steps between releases with potentially
/// breaking/backwards compatible changes.
/// - Hoodie Key : hoodie.table.version
TableVersion,

/// Version of timeline used, by the table.
/// - Hoodie Key : hoodie.timeline.layout.version
TimelineLayoutVersion,
}

Expand Down Expand Up @@ -129,6 +200,7 @@ impl ConfigParser for HudiTableConfig {
}
}

/// Hudi table type
#[derive(Clone, Debug, PartialEq, AsRefStr)]
pub enum TableTypeValue {
#[strum(serialize = "COPY_ON_WRITE")]
Expand All @@ -149,6 +221,9 @@ impl FromStr for TableTypeValue {
}
}

/// Hudi base file format
///
/// only support parquet now
#[derive(Clone, Debug, PartialEq, AsRefStr)]
pub enum BaseFileFormatValue {
#[strum(serialize = "parquet")]
Expand Down
19 changes: 19 additions & 0 deletions crates/core/src/file_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
//! Hudi file group implementation
//!
//! A set of data/base files + set of log files, that make up a unit for all operations.
use std::collections::BTreeMap;
use std::fmt;
Expand All @@ -29,15 +32,27 @@ use crate::storage::file_info::FileInfo;
use crate::storage::file_stats::FileStats;
use crate::storage::Storage;

/// Represents common metadata about base-file.
/// A base file can be Hudi base file or even an external (non-hudi) base file too.
#[derive(Clone, Debug)]
pub struct BaseFile {
/// The group file belong to
pub file_group_id: String,

/// The file commit time
pub commit_time: String,

/// Base file info
pub info: FileInfo,

/// Base file stats
pub stats: Option<FileStats>,
}

impl BaseFile {
/// Parse file name
///
/// return (file_group_id, commit_time)
fn parse_file_name(file_name: &str) -> Result<(String, String)> {
let err_msg = format!("Failed to parse file name '{}' for base file.", file_name);
let (name, _) = file_name.rsplit_once('.').ok_or(anyhow!(err_msg.clone()))?;
Expand All @@ -47,6 +62,7 @@ impl BaseFile {
Ok((file_group_id, commit_time))
}

/// Construct [BaseFile] with file_name
pub fn from_file_name(file_name: &str) -> Result<Self> {
let (file_group_id, commit_time) = Self::parse_file_name(file_name)?;
Ok(Self {
Expand All @@ -57,6 +73,7 @@ impl BaseFile {
})
}

/// Construct [BaseFile] by [FileInfo]
pub fn from_file_info(info: FileInfo) -> Result<Self> {
let (file_group_id, commit_time) = Self::parse_file_name(&info.name)?;
Ok(Self {
Expand All @@ -68,6 +85,8 @@ impl BaseFile {
}
}

/// Within a file group, a slice is a combination of data file written at a commit time and list of log files,
/// containing changes to the data file from that commit time.
#[derive(Clone, Debug)]
pub struct FileSlice {
pub base_file: BaseFile,
Expand Down
Loading

0 comments on commit ea839a4

Please sign in to comment.