Skip to content

Commit

Permalink
feat(storage): support next_epoch method for StateStoreReadLog (#20068)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jan 12, 2025
1 parent 326ec19 commit 3b004c6
Show file tree
Hide file tree
Showing 24 changed files with 476 additions and 189 deletions.
29 changes: 0 additions & 29 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ async fn test_hummock_compaction_task() {
uncommitted_ssts: to_local_sstable_info(&original_tables),
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -242,7 +241,6 @@ async fn test_hummock_table() {
uncommitted_ssts: to_local_sstable_info(&original_tables),
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -305,7 +303,6 @@ async fn test_hummock_transaction() {
uncommitted_ssts: to_local_sstable_info(&tables_in_epoch1),
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -352,7 +349,6 @@ async fn test_hummock_transaction() {
uncommitted_ssts: to_local_sstable_info(&tables_in_epoch2),
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -508,7 +504,6 @@ async fn test_hummock_manager_basic() {
uncommitted_ssts: to_local_sstable_info(&original_tables),
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -631,7 +626,6 @@ async fn test_pin_snapshot_response_lost() {
uncommitted_ssts: to_local_sstable_info(&test_tables),
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -660,7 +654,6 @@ async fn test_pin_snapshot_response_lost() {
uncommitted_ssts: to_local_sstable_info(&test_tables),
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -696,7 +689,6 @@ async fn test_pin_snapshot_response_lost() {
uncommitted_ssts: to_local_sstable_info(&test_tables),
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -724,7 +716,6 @@ async fn test_pin_snapshot_response_lost() {
uncommitted_ssts: to_local_sstable_info(&test_tables),
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -763,7 +754,6 @@ async fn test_print_compact_task() {
uncommitted_ssts: to_local_sstable_info(&original_tables),
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -803,7 +793,6 @@ async fn test_invalid_sst_id() {
uncommitted_ssts: ssts.clone(),
..Default::default()
},
false,
)
.await
.unwrap_err();
Expand All @@ -829,7 +818,6 @@ async fn test_invalid_sst_id() {
uncommitted_ssts: ssts_below_watermerk,
..Default::default()
},
false,
)
.await
.unwrap_err();
Expand All @@ -845,7 +833,6 @@ async fn test_invalid_sst_id() {
uncommitted_ssts: ssts.clone(),
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -987,7 +974,6 @@ async fn test_hummock_compaction_task_heartbeat() {
uncommitted_ssts: to_local_sstable_info(&original_tables),
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1117,7 +1103,6 @@ async fn test_hummock_compaction_task_heartbeat_removal_on_node_removal() {
uncommitted_ssts: to_local_sstable_info(&original_tables),
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1237,7 +1222,6 @@ async fn test_extend_objects_to_delete() {
uncommitted_ssts: vec![],
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1319,7 +1303,6 @@ async fn test_version_stats() {
uncommitted_ssts: ssts,
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1429,7 +1412,6 @@ async fn test_move_state_tables_to_dedicated_compaction_group_on_commit() {
uncommitted_ssts: vec![sst_1],
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1516,7 +1498,6 @@ async fn test_move_state_tables_to_dedicated_compaction_group_on_demand_basic()
uncommitted_ssts: vec![sst_1, sst_2],
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1601,7 +1582,6 @@ async fn test_move_state_tables_to_dedicated_compaction_group_on_demand_non_triv
uncommitted_ssts: vec![sst_1],
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1703,7 +1683,6 @@ async fn test_move_state_tables_to_dedicated_compaction_group_trivial_expired()
uncommitted_ssts: vec![sst_1, sst_2, sst_3, sst_4],
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1850,7 +1829,6 @@ async fn test_move_state_tables_to_dedicated_compaction_group_on_demand_bottom_l
uncommitted_ssts: vec![sst_1, sst_2, sst_3],
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1986,7 +1964,6 @@ async fn test_compaction_task_expiration_due_to_split_group() {
uncommitted_ssts: vec![sst_1, sst_2],
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -2061,7 +2038,6 @@ async fn test_move_tables_between_compaction_group() {
uncommitted_ssts: vec![sst_1.clone()],
..Default::default()
},
false,
)
.await
.unwrap();
Expand All @@ -2075,7 +2051,6 @@ async fn test_move_tables_between_compaction_group() {
uncommitted_ssts: vec![sst_2.clone()],
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -2220,7 +2195,6 @@ async fn test_partition_level() {
uncommitted_ssts: vec![sst_1],
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -2257,7 +2231,6 @@ async fn test_partition_level() {
uncommitted_ssts: vec![sst],
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -2349,7 +2322,6 @@ async fn test_unregister_moved_table() {
uncommitted_ssts: vec![sst_1, sst_2],
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -2474,7 +2446,6 @@ async fn test_merge_compaction_group_task_expired() {
uncommitted_ssts: vec![sst_1, sst_2, sst_3, sst_4],
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down
45 changes: 21 additions & 24 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ use risingwave_pb::hummock::{
SubscribeCompactionEventResponse,
};
use risingwave_rpc_client::error::{Result, RpcError};
use risingwave_rpc_client::{CompactionEventItem, HummockMetaClient};
use risingwave_rpc_client::{
CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -134,11 +136,11 @@ impl HummockMetaClient for MockHummockMetaClient {
})
}

async fn commit_epoch(
async fn commit_epoch_with_change_log(
&self,
epoch: HummockEpoch,
sync_result: SyncResult,
is_log_store: bool,
change_log_info: Option<HummockMetaClientChangeLogInfo>,
) -> Result<()> {
let version: HummockVersion = self.hummock_manager.get_current_version().await;
let table_ids = version
Expand All @@ -148,17 +150,13 @@ impl HummockMetaClient for MockHummockMetaClient {
.map(|table_id| table_id.table_id)
.collect::<BTreeSet<_>>();

let old_value_ssts_vec = if is_log_store {
sync_result.old_value_ssts.clone()
} else {
vec![]
};
let commit_table_ids = sync_result
.uncommitted_ssts
.iter()
.flat_map(|sstable| sstable.sst_info.table_ids.clone())
.chain({
old_value_ssts_vec
sync_result
.old_value_ssts
.iter()
.flat_map(|sstable| sstable.sst_info.table_ids.clone())
})
Expand Down Expand Up @@ -192,22 +190,21 @@ impl HummockMetaClient for MockHummockMetaClient {
.map(|LocalSstableInfo { sst_info, .. }| (sst_info.object_id, self.context_id))
.collect();
let new_table_watermark = sync_result.table_watermarks;
let table_change_log_table_ids = if is_log_store {
commit_table_ids.clone()
} else {
BTreeSet::new()
let table_change_log = match change_log_info {
Some(epochs) => {
assert_eq!(*epochs.last().expect("non-empty"), epoch);
build_table_change_log_delta(
sync_result
.old_value_ssts
.into_iter()
.map(|sst| sst.sst_info),
sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info),
&epochs,
commit_table_ids.iter().map(|&table_id| (table_id, 0)),
)
}
None => Default::default(),
};
let table_change_log = build_table_change_log_delta(
sync_result
.old_value_ssts
.into_iter()
.map(|sst| sst.sst_info),
sync_result.uncommitted_ssts.iter().map(|sst| &sst.sst_info),
&vec![epoch],
table_change_log_table_ids
.into_iter()
.map(|table_id| (table_id, 0)),
);

self.hummock_manager
.commit_epoch(CommitEpochInfo {
Expand Down
3 changes: 0 additions & 3 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ pub async fn add_test_tables(
uncommitted_ssts: test_local_tables,
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -138,7 +137,6 @@ pub async fn add_test_tables(
uncommitted_ssts: test_local_tables_3,
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -406,7 +404,6 @@ pub async fn add_ssts(
uncommitted_ssts: ssts,
..Default::default()
},
false,
)
.await
.unwrap();
Expand Down
10 changes: 8 additions & 2 deletions src/rpc_client/src/hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,24 @@ pub type CompactionEventItem = std::result::Result<SubscribeCompactionEventRespo

use crate::error::Result;

pub type HummockMetaClientChangeLogInfo = Vec<u64>;

#[async_trait]
pub trait HummockMetaClient: Send + Sync + 'static {
async fn unpin_version_before(&self, unpin_version_before: HummockVersionId) -> Result<()>;
async fn get_current_version(&self) -> Result<HummockVersion>;
async fn get_new_sst_ids(&self, number: u32) -> Result<SstObjectIdRange>;
// We keep `commit_epoch` only for test/benchmark.
async fn commit_epoch(
async fn commit_epoch_with_change_log(
&self,
epoch: HummockEpoch,
sync_result: SyncResult,
is_log_store: bool,
change_log_info: Option<HummockMetaClientChangeLogInfo>,
) -> Result<()>;
async fn commit_epoch(&self, epoch: HummockEpoch, sync_result: SyncResult) -> Result<()> {
self.commit_epoch_with_change_log(epoch, sync_result, None)
.await
}
async fn trigger_manual_compaction(
&self,
compaction_group_id: u64,
Expand Down
4 changes: 3 additions & 1 deletion src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient};
pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
pub use connector_client::{SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
pub use frontend_client::{FrontendClientPool, FrontendClientPoolRef};
pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient};
pub use hummock_meta_client::{
CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
};
pub use meta_client::{MetaClient, SinkCoordinationRpcClient};
use rw_futures_util::await_future_with_monitor_error_stream;
pub use sink_coordinate_client::CoordinatorStreamHandle;
Expand Down
Loading

0 comments on commit 3b004c6

Please sign in to comment.