Skip to content

Commit

Permalink
feat: Introduce pluggable block id manager (#41)
Browse files Browse the repository at this point in the history
* feat: Introduce pluggable block id manager

* add more tests

* expose block id number in apps web page

* print the manager type

* enum display
  • Loading branch information
zuston authored Jan 17, 2025
1 parent d9cf729 commit a18e6e0
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 44 deletions.
68 changes: 27 additions & 41 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use std::ops::Deref;
use std::str::FromStr;

use crate::await_tree::AWAIT_TREE_REGISTRY;
use crate::block_id_manager::{get_block_id_manager, BlockIdManager};
use crate::constant::ALL_LABEL;
use crate::grpc::protobuf::uniffle::{BlockIdLayout, RemoteStorage};
use crate::id_layout::IdLayout;
Expand Down Expand Up @@ -151,7 +152,7 @@ pub struct App {
pub(crate) registry_timestamp: u128,

// key: shuffle_id, val: shuffle's all block_ids bitmap
block_id_bitmap: DashMap<i32, Arc<RwLock<Treemap>>>,
block_id_manager: Arc<Box<dyn BlockIdManager>>,

// key: (shuffle_id, partition_id)
partition_meta_infos: DashMap<(i32, i32), PartitionedMeta>,
Expand Down Expand Up @@ -255,7 +256,11 @@ impl App {
app_id.as_str()
);
}

let block_id_manager = get_block_id_manager(&config.app_config.block_id_manager_type);
info!(
"Using the block id manager: {} for app: {}",
&config.app_config.block_id_manager_type, &app_id
);
App {
app_id,
app_config_options: config_options,
Expand All @@ -268,10 +273,14 @@ impl App {
total_resident_data_size: Default::default(),
huge_partition_number: Default::default(),
registry_timestamp: now_timestamp_as_millis(),
block_id_bitmap: Default::default(),
block_id_manager,
}
}

pub fn reported_block_id_number(&self) -> u64 {
self.block_id_manager.get_blocks_number().unwrap_or(0)
}

pub fn huge_partition_number(&self) -> u64 {
self.huge_partition_number.load(SeqCst)
}
Expand Down Expand Up @@ -478,41 +487,12 @@ impl App {

pub async fn get_multi_block_ids(&self, ctx: GetMultiBlockIdsContext) -> Result<Bytes> {
self.heartbeat()?;

let shuffle_id = &ctx.shuffle_id;
let block_id_layout = &ctx.layout;
let partitions: HashSet<&i32> = HashSet::from_iter(&ctx.partition_ids);

let treemap = self
.block_id_bitmap
.entry(*shuffle_id)
.or_insert_with(|| Arc::new(RwLock::new(Treemap::new())))
.clone();
let treemap = treemap.read();
let mut retrieved = Treemap::new();
for element in treemap.iter() {
let partition_id = block_id_layout.get_partition_id(element as i64);
if partitions.contains(&(partition_id as i32)) {
retrieved.add(element);
}
}
Ok(Bytes::from(retrieved.serialize::<JvmLegacy>()))
self.block_id_manager.get_multi_block_ids(ctx).await
}

pub async fn report_multi_block_ids(&self, ctx: ReportMultiBlockIdsContext) -> Result<()> {
self.heartbeat()?;

let shuffle_id = &ctx.shuffle_id;
let treemap = self
.block_id_bitmap
.entry(*shuffle_id)
.or_insert_with(|| Arc::new(RwLock::new(Treemap::new())))
.clone();
let mut treemap = treemap.write();
for block_id in ctx.block_ids {
treemap.add(block_id as u64);
}

self.block_id_manager.report_multi_block_ids(ctx).await?;
Ok(())
}

Expand All @@ -524,7 +504,7 @@ impl App {

if let Some(shuffle_id) = shuffle_id {
// shuffle level bitmap deletion
self.block_id_bitmap.remove(&shuffle_id);
self.block_id_manager.purge_block_ids(shuffle_id).await?;

let mut deletion_keys = vec![];
let view = self.partition_meta_infos.clone().into_read_only();
Expand Down Expand Up @@ -620,10 +600,10 @@ pub struct ReportBlocksContext {
#[derive(Debug, Clone)]
pub struct ReportMultiBlockIdsContext {
pub shuffle_id: i32,
pub block_ids: Vec<i64>,
pub block_ids: HashMap<i32, Vec<i64>>,
}
impl ReportMultiBlockIdsContext {
pub fn new(shuffle_id: i32, block_ids: Vec<i64>) -> ReportMultiBlockIdsContext {
pub fn new(shuffle_id: i32, block_ids: HashMap<i32, Vec<i64>>) -> ReportMultiBlockIdsContext {
Self {
shuffle_id,
block_ids,
Expand Down Expand Up @@ -1025,17 +1005,18 @@ pub(crate) mod test {
RequireBufferContext, WritingViewContext,
};
use crate::config::{Config, HybridStoreConfig, LocalfileStoreConfig, MemoryStoreConfig};
use bytes::Bytes;
use std::sync::Arc;

use crate::error::WorkerError;
use crate::id_layout::{to_layout, IdLayout, DEFAULT_BLOCK_ID_LAYOUT};
use crate::runtime::manager::RuntimeManager;
use crate::storage::StorageService;
use crate::store::{Block, ResponseData};
use bytes::Bytes;
use crc32fast::hash;
use croaring::{JvmLegacy, Treemap};
use dashmap::DashMap;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;

#[test]
fn test_uid_hash() {
Expand Down Expand Up @@ -1245,7 +1226,12 @@ pub(crate) mod test {
runtime_manager
.wait(app.report_multi_block_ids(ReportMultiBlockIdsContext {
shuffle_id: 1,
block_ids: vec![block_id_1.clone(), block_id_2.clone(), block_id_3.clone()],
block_ids: {
let mut hashmap = HashMap::new();
hashmap.insert(10, vec![block_id_1.clone(), block_id_2.clone()]);
hashmap.insert(20, vec![block_id_3.clone()]);
hashmap
},
}))
.expect("TODO: panic message");

Expand Down
Loading

0 comments on commit a18e6e0

Please sign in to comment.