Skip to content

Commit

Permalink
feat: add disable_proposal_forwarding config params
Browse files Browse the repository at this point in the history
Signed-off-by: Dat Tien Nguyen <[email protected]>
  • Loading branch information
datbeohbbh committed Oct 8, 2024
1 parent 2aefbf6 commit ddb3255
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 0 deletions.
112 changes: 112 additions & 0 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5851,3 +5851,115 @@ fn test_switching_check_quorum() {
}
assert_eq!(sm.state, StateRole::Leader);
}

#[test]
fn test_disable_proposal_forwarding() {
let l = default_logger();

let n1 = new_test_raft_with_config(
&Config {
id: 1,
heartbeat_tick: 1,
election_tick: 10,
disable_proposal_forwarding: true,
..Default::default()
},
MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])),
&l,
);

let n2 = new_test_raft_with_config(
&Config {
id: 2,
heartbeat_tick: 1,
election_tick: 10,
disable_proposal_forwarding: true,
..Default::default()
},
MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])),
&l,
);

let n3 = new_test_raft_with_config(
&Config {
id: 3,
heartbeat_tick: 1,
election_tick: 10,
disable_proposal_forwarding: true,
..Default::default()
},
MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])),
&l,
);

let mut network = Network::new(vec![Some(n1), Some(n2), Some(n3)], &l);

// node 1 starts campaign to become leader.
network.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

assert_eq!(network.peers.get(&1).unwrap().state, StateRole::Leader);
assert_eq!(network.peers.get(&2).unwrap().state, StateRole::Follower);
assert_eq!(network.peers.get(&3).unwrap().state, StateRole::Follower);

assert_eq!(network.peers.get(&2).unwrap().leader_id, 1);
assert_eq!(network.peers.get(&3).unwrap().leader_id, 1);

let committed_index_1 = network.peers.get(&1).unwrap().raft_log.committed;

// nodes 2 and 3 are follower, with `disable_proposal_forwarding`` enable, proposal will be dropped.
assert_eq!(
network
.peers
.get_mut(&2)
.unwrap()
.step(new_message(2, 2, MessageType::MsgPropose, 1)),
Err(Error::ProposalDropped)
);
assert_eq!(
network
.peers
.get_mut(&3)
.unwrap()
.step(new_message(3, 3, MessageType::MsgPropose, 1)),
Err(Error::ProposalDropped)
);

// assert no proposals are forwarded from follower and committed.
assert_eq!(
network.peers.get(&1).unwrap().raft_log.committed,
committed_index_1
);
assert_eq!(
network.peers.get(&2).unwrap().raft_log.committed,
committed_index_1
);
assert_eq!(
network.peers.get(&3).unwrap().raft_log.committed,
committed_index_1
);

// send proposal to leader node.
network.send(vec![new_message(1, 1, MessageType::MsgPropose, 5)]);

let committed_index_2 = network.peers.get(&1).unwrap().raft_log.committed;
if committed_index_1 >= committed_index_2 {
panic!(
"expect committed_index_1 < committed_index_2, got {:?} > {:?}",
committed_index_1, committed_index_2
);
}

// assert proposals are accepted on leader node.
assert_eq!(
network.peers.get(&1).unwrap().raft_log.committed,
committed_index_2
);
assert_eq!(
network.peers.get(&2).unwrap().raft_log.committed,
committed_index_2
);
assert_eq!(
network.peers.get(&3).unwrap().raft_log.committed,
committed_index_2
);
}
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ pub struct Config {
/// Maximum raft log number that can be applied after commit but before persist.
/// The default value is 0, which means apply after both commit and persist.
pub max_apply_unpersisted_log_limit: u64,

/// If enable, followers will not forward proposal to leader.
pub disable_proposal_forwarding: bool,
}

impl Default for Config {
Expand All @@ -125,6 +128,7 @@ impl Default for Config {
max_uncommitted_size: NO_LIMIT,
max_committed_size_per_ready: NO_LIMIT,
max_apply_unpersisted_log_limit: 0,
disable_proposal_forwarding: false,
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ pub struct RaftCore<T: Storage> {

/// Max size per committed entries in a `Read`.
pub(crate) max_committed_size_per_ready: u64,

disable_proposal_forwarding: bool,
}

/// A struct that represents the raft consensus itself. Stores details concerning the current
Expand Down Expand Up @@ -363,6 +365,7 @@ impl<T: Storage> Raft<T> {
last_log_tail_index: 0,
},
max_committed_size_per_ready: c.max_committed_size_per_ready,
disable_proposal_forwarding: c.disable_proposal_forwarding,
},
};
confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?;
Expand Down Expand Up @@ -2337,6 +2340,15 @@ impl<T: Storage> Raft<T> {
term = self.term;
);
return Err(Error::ProposalDropped);
} else if self.disable_proposal_forwarding {
info!(
self.logger,
"{from} not forwarding to leader {to} at term {term}; dropping proposal",
from = self.id,
to = self.leader_id,
term = self.term;
);
return Err(Error::ProposalDropped);
}
m.to = self.leader_id;
self.r.send(m, &mut self.msgs);
Expand Down

0 comments on commit ddb3255

Please sign in to comment.