From e548a64c61e6f3cd112964882fc6552a3edb9cc9 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 17 Jan 2025 10:50:20 +0800 Subject: [PATCH 1/5] feat: Introduce pluggable block id manager --- src/app.rs | 60 +++++--------- src/block_id_manager.rs | 171 ++++++++++++++++++++++++++++++++++++++++ src/config.rs | 9 +++ src/grpc/service.rs | 7 +- src/lib.rs | 1 + src/main.rs | 1 + 6 files changed, 206 insertions(+), 43 deletions(-) create mode 100644 src/block_id_manager.rs diff --git a/src/app.rs b/src/app.rs index 590d19d..512aeff 100644 --- a/src/app.rs +++ b/src/app.rs @@ -46,6 +46,7 @@ use std::ops::Deref; use std::str::FromStr; use crate::await_tree::AWAIT_TREE_REGISTRY; +use crate::block_id_manager::{get_block_id_manager, BlockIdManager}; use crate::constant::ALL_LABEL; use crate::grpc::protobuf::uniffle::{BlockIdLayout, RemoteStorage}; use crate::id_layout::IdLayout; @@ -151,7 +152,7 @@ pub struct App { pub(crate) registry_timestamp: u128, // key: shuffle_id, val: shuffle's all block_ids bitmap - block_id_bitmap: DashMap>>, + block_id_manager: Arc>, // key: (shuffle_id, partition_id) partition_meta_infos: DashMap<(i32, i32), PartitionedMeta>, @@ -255,7 +256,7 @@ impl App { app_id.as_str() ); } - + let block_id_manager = get_block_id_manager(&config.app_config.block_id_manager_type); App { app_id, app_config_options: config_options, @@ -268,7 +269,7 @@ impl App { total_resident_data_size: Default::default(), huge_partition_number: Default::default(), registry_timestamp: now_timestamp_as_millis(), - block_id_bitmap: Default::default(), + block_id_manager, } } @@ -478,41 +479,12 @@ impl App { pub async fn get_multi_block_ids(&self, ctx: GetMultiBlockIdsContext) -> Result { self.heartbeat()?; - - let shuffle_id = &ctx.shuffle_id; - let block_id_layout = &ctx.layout; - let partitions: HashSet<&i32> = HashSet::from_iter(&ctx.partition_ids); - - let treemap = self - .block_id_bitmap - .entry(*shuffle_id) - .or_insert_with(|| Arc::new(RwLock::new(Treemap::new()))) - .clone(); - let treemap = treemap.read(); - let mut retrieved = Treemap::new(); - for element in treemap.iter() { - let partition_id = block_id_layout.get_partition_id(element as i64); - if partitions.contains(&(partition_id as i32)) { - retrieved.add(element); - } - } - Ok(Bytes::from(retrieved.serialize::())) + self.block_id_manager.get_multi_block_ids(ctx).await } pub async fn report_multi_block_ids(&self, ctx: ReportMultiBlockIdsContext) -> Result<()> { self.heartbeat()?; - - let shuffle_id = &ctx.shuffle_id; - let treemap = self - .block_id_bitmap - .entry(*shuffle_id) - .or_insert_with(|| Arc::new(RwLock::new(Treemap::new()))) - .clone(); - let mut treemap = treemap.write(); - for block_id in ctx.block_ids { - treemap.add(block_id as u64); - } - + self.block_id_manager.report_multi_block_ids(ctx).await?; Ok(()) } @@ -524,7 +496,7 @@ impl App { if let Some(shuffle_id) = shuffle_id { // shuffle level bitmap deletion - self.block_id_bitmap.remove(&shuffle_id); + self.block_id_manager.purge_block_ids(shuffle_id).await?; let mut deletion_keys = vec![]; let view = self.partition_meta_infos.clone().into_read_only(); @@ -620,10 +592,10 @@ pub struct ReportBlocksContext { #[derive(Debug, Clone)] pub struct ReportMultiBlockIdsContext { pub shuffle_id: i32, - pub block_ids: Vec, + pub block_ids: HashMap>, } impl ReportMultiBlockIdsContext { - pub fn new(shuffle_id: i32, block_ids: Vec) -> ReportMultiBlockIdsContext { + pub fn new(shuffle_id: i32, block_ids: HashMap>) -> ReportMultiBlockIdsContext { Self { shuffle_id, block_ids, @@ -1025,17 +997,18 @@ pub(crate) mod test { RequireBufferContext, WritingViewContext, }; use crate::config::{Config, HybridStoreConfig, LocalfileStoreConfig, MemoryStoreConfig}; - use bytes::Bytes; - use std::sync::Arc; - use crate::error::WorkerError; use crate::id_layout::{to_layout, IdLayout, DEFAULT_BLOCK_ID_LAYOUT}; use crate::runtime::manager::RuntimeManager; use crate::storage::StorageService; use crate::store::{Block, ResponseData}; + use bytes::Bytes; + use crc32fast::hash; use croaring::{JvmLegacy, Treemap}; use dashmap::DashMap; use parking_lot::RwLock; + use std::collections::HashMap; + use std::sync::Arc; #[test] fn test_uid_hash() { @@ -1245,7 +1218,12 @@ pub(crate) mod test { runtime_manager .wait(app.report_multi_block_ids(ReportMultiBlockIdsContext { shuffle_id: 1, - block_ids: vec![block_id_1.clone(), block_id_2.clone(), block_id_3.clone()], + block_ids: { + let mut hashmap = HashMap::new(); + hashmap.insert(10, vec![block_id_1.clone(), block_id_2.clone()]); + hashmap.insert(20, vec![block_id_3.clone()]); + hashmap + }, })) .expect("TODO: panic message"); diff --git a/src/block_id_manager.rs b/src/block_id_manager.rs new file mode 100644 index 0000000..f45af63 --- /dev/null +++ b/src/block_id_manager.rs @@ -0,0 +1,171 @@ +use crate::app::{GetMultiBlockIdsContext, ReportMultiBlockIdsContext}; +use crate::block_id_manager::BlockIdManagerType::DEFAULT; +use anyhow::Result; +use async_trait::async_trait; +use bytes::Bytes; +use croaring::{JvmLegacy, Treemap}; +use dashmap::DashMap; +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::sync::Arc; + +/// The block id manager is used by the every app, so the app id will not be scoped here. +#[async_trait] +pub trait BlockIdManager: Send + Sync { + async fn get_multi_block_ids(&self, ctx: GetMultiBlockIdsContext) -> Result; + async fn report_multi_block_ids(&self, ctx: ReportMultiBlockIdsContext) -> Result<()>; + async fn purge_block_ids(&self, shuffle_id: i32) -> Result<()>; + async fn get_blocks_number(&self) -> Result; +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub enum BlockIdManagerType { + DEFAULT, + PARTITIONED, +} +impl Default for BlockIdManagerType { + fn default() -> Self { + DEFAULT + } +} + +pub fn get_block_id_manager(b_type: &BlockIdManagerType) -> Arc> { + match b_type { + BlockIdManagerType::PARTITIONED => Arc::new(Box::new(PartitionedBlockIdManager::default())), + BlockIdManagerType::DEFAULT => Arc::new(Box::new(DefaultBlockIdManager::default())), + } +} + +#[derive(Default)] +pub struct PartitionedBlockIdManager { + block_id_bitmap: DashMap>>, + number: AtomicU64, +} + +#[async_trait] +impl BlockIdManager for PartitionedBlockIdManager { + async fn get_multi_block_ids(&self, ctx: GetMultiBlockIdsContext) -> Result { + let shuffle_id = &ctx.shuffle_id; + let block_id_layout = &ctx.layout; + let partitions: HashSet<&i32> = HashSet::from_iter(&ctx.partition_ids); + + let treemap = self + .block_id_bitmap + .entry(*shuffle_id) + .or_insert_with(|| Arc::new(RwLock::new(Treemap::new()))) + .clone(); + let treemap = treemap.read(); + let mut retrieved = Treemap::new(); + for element in treemap.iter() { + let partition_id = block_id_layout.get_partition_id(element as i64); + if partitions.contains(&(partition_id as i32)) { + retrieved.add(element); + } + } + Ok(Bytes::from(retrieved.serialize::())) + } + + async fn report_multi_block_ids(&self, ctx: ReportMultiBlockIdsContext) -> Result<()> { + let shuffle_id = &ctx.shuffle_id; + let treemap = self + .block_id_bitmap + .entry(*shuffle_id) + .or_insert_with(|| Arc::new(RwLock::new(Treemap::new()))) + .clone(); + let partitioned_block_ids = ctx.block_ids; + let mut treemap = treemap.write(); + let mut number = 0; + for (_, block_ids) in partitioned_block_ids { + number += block_ids.len(); + for block_id in block_ids { + treemap.add(block_id as u64); + } + } + self.number.fetch_add(number as u64, SeqCst); + Ok(()) + } + + async fn purge_block_ids(&self, shuffle_id: i32) -> Result<()> { + if let Some(treemap) = self.block_id_bitmap.remove(&shuffle_id) { + let map = treemap.1.read(); + self.number.fetch_sub(map.cardinality(), SeqCst); + } + Ok(()) + } + + async fn get_blocks_number(&self) -> Result { + let number = self.number.load(SeqCst); + Ok(number) + } +} + +#[derive(Default)] +struct DefaultBlockIdManager { + number: AtomicU64, + // key: (shuffle_id, partition_id) + block_id_bitmap: DashMap<(i32, i32), Arc>>, +} + +#[async_trait] +impl BlockIdManager for DefaultBlockIdManager { + async fn get_multi_block_ids(&self, ctx: GetMultiBlockIdsContext) -> Result { + let shuffle_id = ctx.shuffle_id; + let partition_ids = ctx.partition_ids; + let mut treemap = Treemap::new(); + for pid in partition_ids { + if let Some(bitmap) = self.block_id_bitmap.get(&(shuffle_id, pid)) { + let bitmap = bitmap.clone(); + let bitmap = bitmap.read(); + treemap.extend(bitmap.iter()); + } + } + Ok(Bytes::from(treemap.serialize::())) + } + + async fn report_multi_block_ids(&self, ctx: ReportMultiBlockIdsContext) -> Result<()> { + let shuffle_id = ctx.shuffle_id; + let partitioned_block_ids = ctx.block_ids; + let mut number = 0; + for (pid, block_ids) in partitioned_block_ids { + number += block_ids.len(); + let treemap = self + .block_id_bitmap + .entry((shuffle_id, pid)) + .or_insert_with(|| Arc::new(RwLock::new(Treemap::new()))) + .clone(); + let mut treemap = treemap.write(); + for block_id in block_ids { + treemap.add(block_id as u64); + } + } + self.number.fetch_add(number as u64, SeqCst); + Ok(()) + } + + async fn purge_block_ids(&self, shuffle_id: i32) -> Result<()> { + let view = self.block_id_bitmap.clone().into_read_only(); + let mut deletion_keys = vec![]; + for (v_shuffle_id, v_partition_id) in view.keys() { + if *v_shuffle_id == shuffle_id { + deletion_keys.push((shuffle_id, *v_partition_id)); + } + } + drop(view); + let mut number = 0; + for deletion_key in deletion_keys { + if let Some(bitmap) = self.block_id_bitmap.remove(&deletion_key) { + let bitmap = bitmap.1.read(); + number -= bitmap.cardinality(); + } + } + self.number.fetch_sub(number, SeqCst); + Ok(()) + } + + async fn get_blocks_number(&self) -> Result { + Ok(self.number.load(SeqCst)) + } +} diff --git a/src/config.rs b/src/config.rs index 34c5310..e3be460 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::block_id_manager::BlockIdManagerType; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs; @@ -355,6 +356,13 @@ pub struct AppConfig { pub huge_partition_marked_threshold: Option, pub huge_partition_memory_limit_percent: Option, + + #[serde(default = "as_default_block_id_manager_type")] + pub block_id_manager_type: BlockIdManagerType, +} + +fn as_default_block_id_manager_type() -> BlockIdManagerType { + BlockIdManagerType::DEFAULT } fn as_default_app_config() -> AppConfig { @@ -362,6 +370,7 @@ fn as_default_app_config() -> AppConfig { app_heartbeat_timeout_min: as_default_app_heartbeat_timeout_min(), huge_partition_marked_threshold: None, huge_partition_memory_limit_percent: None, + block_id_manager_type: as_default_block_id_manager_type(), } } diff --git a/src/grpc/service.rs b/src/grpc/service.rs index 1daa749..bf54c44 100644 --- a/src/grpc/service.rs +++ b/src/grpc/service.rs @@ -574,9 +574,12 @@ impl ShuffleServer for DefaultShuffleServer { })); } let app = app.unwrap(); - let mut block_ids = vec![]; + let mut block_ids = HashMap::new(); for partition_to_block_id in partition_to_block_ids { - block_ids.extend(partition_to_block_id.block_ids); + block_ids.insert( + partition_to_block_id.partition_id, + partition_to_block_id.block_ids, + ); } match app .report_multi_block_ids(ReportMultiBlockIdsContext::new(shuffle_id, block_ids)) diff --git a/src/lib.rs b/src/lib.rs index f1236a1..e6d3f45 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,7 @@ pub mod semaphore_with_index; pub mod storage; pub mod bits; +pub mod block_id_manager; pub mod histogram; pub mod id_layout; diff --git a/src/main.rs b/src/main.rs index bc1a428..42c5536 100644 --- a/src/main.rs +++ b/src/main.rs @@ -67,6 +67,7 @@ mod log_service; mod logforth_service; pub mod bits; +pub mod block_id_manager; pub mod histogram; mod mem_allocator; mod metric; From 9c54187f93e4b76f8b09993aee4d5827f7aa5fa1 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 17 Jan 2025 11:10:14 +0800 Subject: [PATCH 2/5] add more tests --- src/block_id_manager.rs | 91 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) diff --git a/src/block_id_manager.rs b/src/block_id_manager.rs index f45af63..e1b772d 100644 --- a/src/block_id_manager.rs +++ b/src/block_id_manager.rs @@ -158,7 +158,7 @@ impl BlockIdManager for DefaultBlockIdManager { for deletion_key in deletion_keys { if let Some(bitmap) = self.block_id_bitmap.remove(&deletion_key) { let bitmap = bitmap.1.read(); - number -= bitmap.cardinality(); + number += bitmap.cardinality(); } } self.number.fetch_sub(number, SeqCst); @@ -169,3 +169,92 @@ impl BlockIdManager for DefaultBlockIdManager { Ok(self.number.load(SeqCst)) } } + +#[cfg(test)] +mod tests { + use crate::app::{GetMultiBlockIdsContext, ReportMultiBlockIdsContext}; + use crate::block_id_manager::{get_block_id_manager, BlockIdManager, BlockIdManagerType}; + use crate::id_layout::{to_layout, DEFAULT_BLOCK_ID_LAYOUT}; + use anyhow::Result; + use croaring::{JvmLegacy, Treemap}; + use futures::future::ok; + use std::collections::{HashMap, HashSet}; + use std::sync::Arc; + + async fn test_block_id_manager(manager: Arc>) -> Result<()> { + let shuffle_id = 10; + + let mut partitioned_block_ids = HashMap::new(); + for pid in 0..100 { + let mut block_ids = vec![]; + for idx in 0..20 { + let block_id = DEFAULT_BLOCK_ID_LAYOUT.get_block_id(idx, pid, idx + pid); + block_ids.push(block_id); + } + partitioned_block_ids.insert(pid as i32, block_ids); + } + + // report + manager + .report_multi_block_ids(ReportMultiBlockIdsContext { + shuffle_id, + block_ids: partitioned_block_ids.clone(), + }) + .await?; + assert_eq!(100 * 20, manager.get_blocks_number().await?); + + // get by one partition + for partition_id in 0..100 { + let gotten = manager + .get_multi_block_ids(GetMultiBlockIdsContext { + shuffle_id, + partition_ids: vec![partition_id], + layout: to_layout(None), + }) + .await?; + let deserialized = Treemap::deserialize::(&gotten); + assert_eq!(20, deserialized.cardinality()); + let layout = to_layout(None); + for block_id in deserialized.iter() { + assert_eq!( + partition_id, + layout.get_partition_id(block_id as i64) as i32 + ); + } + } + + // get by multi partition + let partition_ids = vec![1, 2, 3, 4]; + let gotten = manager + .get_multi_block_ids(GetMultiBlockIdsContext { + shuffle_id, + partition_ids: partition_ids.clone(), + layout: to_layout(None), + }) + .await?; + let deserialized = Treemap::deserialize::(&gotten); + assert_eq!(20 * 4, deserialized.cardinality()); + let layout = to_layout(None); + let hash_ids = HashSet::::from_iter(partition_ids); + for block_id in deserialized.iter() { + let pid = layout.get_partition_id(block_id as i64) as i32; + if !hash_ids.contains(&pid) { + panic!() + } + } + + // purge + manager.purge_block_ids(shuffle_id).await?; + assert_eq!(0, manager.get_blocks_number().await?); + + Ok(()) + } + + #[tokio::test] + async fn test() -> Result<()> { + test_block_id_manager(get_block_id_manager(&BlockIdManagerType::DEFAULT)).await?; + test_block_id_manager(get_block_id_manager(&BlockIdManagerType::PARTITIONED)).await?; + + Ok(()) + } +} From a5010d26a0c9319a87ec87953a2921990480fe35 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 17 Jan 2025 11:17:55 +0800 Subject: [PATCH 3/5] expose block id number in apps web page --- src/app.rs | 4 ++++ src/block_id_manager.rs | 10 +++++----- src/http/apps.rs | 4 +++- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/app.rs b/src/app.rs index 512aeff..46ee307 100644 --- a/src/app.rs +++ b/src/app.rs @@ -273,6 +273,10 @@ impl App { } } + pub fn reported_block_id_number(&self) -> u64 { + self.block_id_manager.get_blocks_number().unwrap_or(0) + } + pub fn huge_partition_number(&self) -> u64 { self.huge_partition_number.load(SeqCst) } diff --git a/src/block_id_manager.rs b/src/block_id_manager.rs index e1b772d..6ed20e5 100644 --- a/src/block_id_manager.rs +++ b/src/block_id_manager.rs @@ -18,7 +18,7 @@ pub trait BlockIdManager: Send + Sync { async fn get_multi_block_ids(&self, ctx: GetMultiBlockIdsContext) -> Result; async fn report_multi_block_ids(&self, ctx: ReportMultiBlockIdsContext) -> Result<()>; async fn purge_block_ids(&self, shuffle_id: i32) -> Result<()>; - async fn get_blocks_number(&self) -> Result; + fn get_blocks_number(&self) -> Result; } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] @@ -96,7 +96,7 @@ impl BlockIdManager for PartitionedBlockIdManager { Ok(()) } - async fn get_blocks_number(&self) -> Result { + fn get_blocks_number(&self) -> Result { let number = self.number.load(SeqCst); Ok(number) } @@ -165,7 +165,7 @@ impl BlockIdManager for DefaultBlockIdManager { Ok(()) } - async fn get_blocks_number(&self) -> Result { + fn get_blocks_number(&self) -> Result { Ok(self.number.load(SeqCst)) } } @@ -201,7 +201,7 @@ mod tests { block_ids: partitioned_block_ids.clone(), }) .await?; - assert_eq!(100 * 20, manager.get_blocks_number().await?); + assert_eq!(100 * 20, manager.get_blocks_number()?); // get by one partition for partition_id in 0..100 { @@ -245,7 +245,7 @@ mod tests { // purge manager.purge_block_ids(shuffle_id).await?; - assert_eq!(0, manager.get_blocks_number().await?); + assert_eq!(0, manager.get_blocks_number()?); Ok(()) } diff --git a/src/http/apps.rs b/src/http/apps.rs index 02b5159..878ce36 100644 --- a/src/http/apps.rs +++ b/src/http/apps.rs @@ -51,6 +51,7 @@ fn table() -> Html { duration (minutes) resident data (gb) partition number/huge partition + reported block id number "# .to_string(); @@ -73,13 +74,14 @@ fn table() -> Html { .to_string(); html_content.push_str(&format!( - "{}{}{}{}{}/{}", + "{}{}{}{}{}/{}{}", app_id, date, duration_min, bytes_to_gb(resident_bytes), app.partition_number(), app.huge_partition_number(), + app.reported_block_id_number(), )); } From 5ca202ccc5a949cb6c0ba50d779f1669f0f2a2cb Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 17 Jan 2025 11:22:25 +0800 Subject: [PATCH 4/5] print the manager type --- src/app.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/app.rs b/src/app.rs index 46ee307..4f0a256 100644 --- a/src/app.rs +++ b/src/app.rs @@ -257,6 +257,10 @@ impl App { ); } let block_id_manager = get_block_id_manager(&config.app_config.block_id_manager_type); + info!( + "Using the block id manager: {} for app: {}", + &config.app_config.block_id_manager_type, &app_id + ); App { app_id, app_config_options: config_options, From 103d37c86a46dba672002d881f4f8b57f46d2a43 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 17 Jan 2025 11:34:55 +0800 Subject: [PATCH 5/5] enum display --- src/block_id_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/block_id_manager.rs b/src/block_id_manager.rs index 6ed20e5..179aa21 100644 --- a/src/block_id_manager.rs +++ b/src/block_id_manager.rs @@ -21,7 +21,7 @@ pub trait BlockIdManager: Send + Sync { fn get_blocks_number(&self) -> Result; } -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, strum_macros::Display)] pub enum BlockIdManagerType { DEFAULT, PARTITIONED,