From ddb3255218700e217d5156567c8162fbe7fc5992 Mon Sep 17 00:00:00 2001 From: Dat Tien Nguyen Date: Tue, 8 Oct 2024 23:42:28 +0700 Subject: [PATCH] feat: add disable_proposal_forwarding config params Signed-off-by: Dat Tien Nguyen --- harness/tests/integration_cases/test_raft.rs | 112 +++++++++++++++++++ src/config.rs | 4 + src/raft.rs | 12 ++ 3 files changed, 128 insertions(+) diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index a62266d2..4fdcf98a 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -5851,3 +5851,115 @@ fn test_switching_check_quorum() { } assert_eq!(sm.state, StateRole::Leader); } + +#[test] +fn test_disable_proposal_forwarding() { + let l = default_logger(); + + let n1 = new_test_raft_with_config( + &Config { + id: 1, + heartbeat_tick: 1, + election_tick: 10, + disable_proposal_forwarding: true, + ..Default::default() + }, + MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])), + &l, + ); + + let n2 = new_test_raft_with_config( + &Config { + id: 2, + heartbeat_tick: 1, + election_tick: 10, + disable_proposal_forwarding: true, + ..Default::default() + }, + MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])), + &l, + ); + + let n3 = new_test_raft_with_config( + &Config { + id: 3, + heartbeat_tick: 1, + election_tick: 10, + disable_proposal_forwarding: true, + ..Default::default() + }, + MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])), + &l, + ); + + let mut network = Network::new(vec![Some(n1), Some(n2), Some(n3)], &l); + + // node 1 starts campaign to become leader. + network.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); + + assert_eq!(network.peers.get(&1).unwrap().state, StateRole::Leader); + assert_eq!(network.peers.get(&2).unwrap().state, StateRole::Follower); + assert_eq!(network.peers.get(&3).unwrap().state, StateRole::Follower); + + assert_eq!(network.peers.get(&2).unwrap().leader_id, 1); + assert_eq!(network.peers.get(&3).unwrap().leader_id, 1); + + let committed_index_1 = network.peers.get(&1).unwrap().raft_log.committed; + + // nodes 2 and 3 are follower, with `disable_proposal_forwarding`` enable, proposal will be dropped. + assert_eq!( + network + .peers + .get_mut(&2) + .unwrap() + .step(new_message(2, 2, MessageType::MsgPropose, 1)), + Err(Error::ProposalDropped) + ); + assert_eq!( + network + .peers + .get_mut(&3) + .unwrap() + .step(new_message(3, 3, MessageType::MsgPropose, 1)), + Err(Error::ProposalDropped) + ); + + // assert no proposals are forwarded from follower and committed. + assert_eq!( + network.peers.get(&1).unwrap().raft_log.committed, + committed_index_1 + ); + assert_eq!( + network.peers.get(&2).unwrap().raft_log.committed, + committed_index_1 + ); + assert_eq!( + network.peers.get(&3).unwrap().raft_log.committed, + committed_index_1 + ); + + // send proposal to leader node. + network.send(vec![new_message(1, 1, MessageType::MsgPropose, 5)]); + + let committed_index_2 = network.peers.get(&1).unwrap().raft_log.committed; + if committed_index_1 >= committed_index_2 { + panic!( + "expect committed_index_1 < committed_index_2, got {:?} > {:?}", + committed_index_1, committed_index_2 + ); + } + + // assert proposals are accepted on leader node. + assert_eq!( + network.peers.get(&1).unwrap().raft_log.committed, + committed_index_2 + ); + assert_eq!( + network.peers.get(&2).unwrap().raft_log.committed, + committed_index_2 + ); + assert_eq!( + network.peers.get(&3).unwrap().raft_log.committed, + committed_index_2 + ); +} diff --git a/src/config.rs b/src/config.rs index 1bc6d323..3668db88 100644 --- a/src/config.rs +++ b/src/config.rs @@ -102,6 +102,9 @@ pub struct Config { /// Maximum raft log number that can be applied after commit but before persist. /// The default value is 0, which means apply after both commit and persist. pub max_apply_unpersisted_log_limit: u64, + + /// If enable, followers will not forward proposal to leader. + pub disable_proposal_forwarding: bool, } impl Default for Config { @@ -125,6 +128,7 @@ impl Default for Config { max_uncommitted_size: NO_LIMIT, max_committed_size_per_ready: NO_LIMIT, max_apply_unpersisted_log_limit: 0, + disable_proposal_forwarding: false, } } } diff --git a/src/raft.rs b/src/raft.rs index 91d90209..24cbe38c 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -255,6 +255,8 @@ pub struct RaftCore { /// Max size per committed entries in a `Read`. pub(crate) max_committed_size_per_ready: u64, + + disable_proposal_forwarding: bool, } /// A struct that represents the raft consensus itself. Stores details concerning the current @@ -363,6 +365,7 @@ impl Raft { last_log_tail_index: 0, }, max_committed_size_per_ready: c.max_committed_size_per_ready, + disable_proposal_forwarding: c.disable_proposal_forwarding, }, }; confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?; @@ -2337,6 +2340,15 @@ impl Raft { term = self.term; ); return Err(Error::ProposalDropped); + } else if self.disable_proposal_forwarding { + info!( + self.logger, + "{from} not forwarding to leader {to} at term {term}; dropping proposal", + from = self.id, + to = self.leader_id, + term = self.term; + ); + return Err(Error::ProposalDropped); } m.to = self.leader_id; self.r.send(m, &mut self.msgs);