Skip to content

Commit

Permalink
feat(code/core-consensus): Add vote_sync_mode option to consensus p…
Browse files Browse the repository at this point in the history
…arameters (#870)

* Add `VoteSyncMode` option to the core consensus parameters

* Update VoteSync tests

* Only request vote set from peer if `vote_sync_mode == RequestResponse`

* Automatically enable the Sync actor if vote sync mode is request-response

* Only schedule the rebroadcast timer if vote sync mode is `rebroadcast`

* Ensure `vote_sync` section is present in the config generated by `init` and `testnet` commands

* Fix test suite

* Remove leftover from original PR

* Use initial height by default instead of hardcoding 1

* Fix duplicate tracing span in example app

* Rename `sync` config option to `value_sync`

* Rename `TestParams::enable_sync` to `TestParams::enable_value_sync`

* Spawn Sync actor if either `value_sync.config` is true or `consensus.vote_sync.mode` is `request-response`

* Update example config file
  • Loading branch information
romac authored Feb 24, 2025
1 parent 5aec3d5 commit ad7350e
Show file tree
Hide file tree
Showing 33 changed files with 443 additions and 184 deletions.
3 changes: 2 additions & 1 deletion code/crates/app-channel/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ where
ctx.clone(),
network.clone(),
connector.clone(),
&cfg.sync,
&cfg.value_sync,
&cfg.consensus.vote_sync,
&registry,
)
.await?;
Expand Down
18 changes: 14 additions & 4 deletions code/crates/app/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ use malachitebft_engine::util::events::TxEvent;
use malachitebft_engine::wal::{Wal, WalCodec, WalRef};
use malachitebft_network::{Config as NetworkConfig, DiscoveryConfig, GossipSubConfig, Keypair};

use crate::types::config::{Config as NodeConfig, PubSubProtocol, SyncConfig, TransportProtocol};
use crate::types::config::{
self as config, Config as NodeConfig, PubSubProtocol, TransportProtocol, ValueSyncConfig,
VoteSyncConfig,
};
use crate::types::core::{Context, SigningProvider};
use crate::types::metrics::{Metrics, SharedRegistry};
use crate::types::sync;
use crate::types::ValuePayload;
use crate::types::{ValuePayload, VoteSyncMode};

pub async fn spawn_node_actor<Ctx>(
ctx: Ctx,
Expand Down Expand Up @@ -92,12 +95,18 @@ where
config::ValuePayload::ProposalAndParts => ValuePayload::ProposalAndParts,
};

let vote_sync_mode = match cfg.consensus.vote_sync.mode {
config::VoteSyncMode::RequestResponse => VoteSyncMode::RequestResponse,
config::VoteSyncMode::Rebroadcast => VoteSyncMode::Rebroadcast,
};

let consensus_params = ConsensusParams {
initial_height,
initial_validator_set,
address,
threshold_params: Default::default(),
value_payload,
vote_sync_mode,
};

Consensus::spawn(
Expand Down Expand Up @@ -141,13 +150,14 @@ pub async fn spawn_sync_actor<Ctx>(
ctx: Ctx,
network: NetworkRef<Ctx>,
host: HostRef<Ctx>,
config: &SyncConfig,
config: &ValueSyncConfig,
vote_sync: &VoteSyncConfig,
registry: &SharedRegistry,
) -> Result<Option<SyncRef<Ctx>>>
where
Ctx: Context,
{
if !config.enabled {
if !config.enabled && vote_sync.mode != config::VoteSyncMode::RequestResponse {
return Ok(None);
}

Expand Down
2 changes: 1 addition & 1 deletion code/crates/app/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Re-export of all types required to build a Malachite application.
pub use malachitebft_core_consensus::{
ConsensusMsg, ProposedValue, SignedConsensusMsg, ValuePayload,
ConsensusMsg, ProposedValue, SignedConsensusMsg, ValuePayload, VoteSyncMode,
};
pub use malachitebft_engine::host::LocallyProposedValue;
pub use malachitebft_peer::PeerId;
Expand Down
33 changes: 28 additions & 5 deletions code/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub struct Config {
/// Mempool configuration options
pub mempool: MempoolConfig,

/// Sync configuration options
pub sync: SyncConfig,
/// ValueSync configuration options
pub value_sync: ValueSyncConfig,

/// Metrics configuration options
pub metrics: MetricsConfig,
Expand Down Expand Up @@ -352,9 +352,10 @@ pub struct MempoolConfig {
pub gossip_batch_size: usize,
}

/// ValueSync configuration options
#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct SyncConfig {
/// Enable Sync
pub struct ValueSyncConfig {
/// Enable ValueSync
pub enabled: bool,

/// Interval at which to update other peers of our status
Expand All @@ -366,7 +367,7 @@ pub struct SyncConfig {
pub request_timeout: Duration,
}

impl Default for SyncConfig {
impl Default for ValueSyncConfig {
fn default() -> Self {
Self {
enabled: true,
Expand All @@ -385,6 +386,28 @@ pub struct ConsensusConfig {

/// P2P configuration options
pub p2p: P2pConfig,

/// VoteSync configuration options
pub vote_sync: VoteSyncConfig,
}

#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct VoteSyncConfig {
/// The mode of vote synchronization
/// - RequestResponse: The lagging node sends a request to a peer for the missing votes
/// - Rebroadcast: Nodes rebroadcast their last vote to all peers
pub mode: VoteSyncMode,
}

/// The mode of vote synchronization
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum VoteSyncMode {
/// The lagging node sends a request to a peer for the missing votes
#[default]
RequestResponse,
/// Nodes rebroadcast their last vote to all peers
Rebroadcast,
}

/// Message types required by consensus to deliver the value being proposed
Expand Down
2 changes: 1 addition & 1 deletion code/crates/core-consensus/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ where
/// Consensus has been stuck in Prevote or Precommit step, ask for vote sets from peers
///
/// Resume with: [`resume::Continue`]
GetVoteSet(Ctx::Height, Round, resume::Continue),
RequestVoteSet(Ctx::Height, Round, resume::Continue),

/// A peer has required our vote set, send the response
///
Expand Down
14 changes: 9 additions & 5 deletions code/crates/core-consensus/src/handle/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::handle::vote::on_vote;
use crate::prelude::*;
use crate::types::SignedConsensusMsg;
use crate::util::pretty::PrettyVal;
use crate::VoteSyncMode;

#[async_recursion]
pub async fn apply_driver_input<Ctx>(
Expand Down Expand Up @@ -267,12 +268,15 @@ where

state.set_last_vote(signed_vote);

let timeout = match vote_type {
VoteType::Prevote => Timeout::prevote_rebroadcast(state.driver.round()),
VoteType::Precommit => Timeout::precommit_rebroadcast(state.driver.round()),
};
// Schedule rebroadcast timer if necessary
if state.params.vote_sync_mode == VoteSyncMode::Rebroadcast {
let timeout = match vote_type {
VoteType::Prevote => Timeout::prevote_rebroadcast(state.driver.round()),
VoteType::Precommit => Timeout::precommit_rebroadcast(state.driver.round()),
};

perform!(co, Effect::ScheduleTimeout(timeout, Default::default()));
perform!(co, Effect::ScheduleTimeout(timeout, Default::default()));
}
}

Ok(())
Expand Down
6 changes: 5 additions & 1 deletion code/crates/core-consensus/src/handle/rebroadcast_timeout.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::prelude::*;
use crate::{prelude::*, VoteSyncMode};

#[cfg_attr(not(feature = "metrics"), allow(unused_variables))]
pub async fn on_rebroadcast_timeout<Ctx>(
Expand All @@ -10,6 +10,10 @@ pub async fn on_rebroadcast_timeout<Ctx>(
where
Ctx: Context,
{
if state.params.vote_sync_mode != VoteSyncMode::Rebroadcast {
return Ok(());
}

let (height, round) = (state.driver.height(), state.driver.round());

let (maybe_vote, timeout) = match timeout.kind {
Expand Down
20 changes: 13 additions & 7 deletions code/crates/core-consensus/src/handle/step_timeout.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::prelude::*;
use crate::{prelude::*, VoteSyncMode};

#[cfg_attr(not(feature = "metrics"), allow(unused_variables))]
pub async fn on_step_limit_timeout<Ctx>(
Expand All @@ -11,13 +11,19 @@ where
Ctx: Context,
{
warn!(
height = %state.driver.height(), round = %state.driver.round(),
"Consensus is halted in {:?} step, start vote synchronization", state.driver.step());
height = %state.driver.height(), %round,
"Consensus is halted in {:?} step", state.driver.step());

perform!(
co,
Effect::GetVoteSet(state.driver.height(), round, Default::default())
);
if state.params.vote_sync_mode == VoteSyncMode::RequestResponse {
warn!(
height = %state.driver.height(), %round,
"Requesting vote set");

perform!(
co,
Effect::RequestVoteSet(state.driver.height(), round, Default::default())
);
}

#[cfg(feature = "metrics")]
metrics.step_timeouts.inc();
Expand Down
2 changes: 1 addition & 1 deletion code/crates/core-consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod error;
pub use error::Error;

mod params;
pub use params::{Params, ThresholdParams};
pub use params::{Params, ThresholdParams, VoteSyncMode};

mod effect;
pub use effect::{Effect, Resumable, Resume};
Expand Down
13 changes: 13 additions & 0 deletions code/crates/core-consensus/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,17 @@ pub struct Params<Ctx: Context> {

/// The messages required to deliver proposals
pub value_payload: ValuePayload,

/// The VoteSync mode
pub vote_sync_mode: VoteSyncMode,
}

/// The mode of vote synchronization
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum VoteSyncMode {
/// The lagging node sends a request to a peer for the missing votes
#[default]
RequestResponse,
/// Nodes rebroadcast their last vote to all peers
Rebroadcast,
}
11 changes: 5 additions & 6 deletions code/crates/engine/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing::{debug, error, error_span, info, warn};
use malachitebft_codec as codec;
use malachitebft_config::TimeoutConfig;
use malachitebft_core_consensus::{
Effect, PeerId, Resumable, Resume, SignedConsensusMsg, VoteExtensionError,
Effect, PeerId, Resumable, Resume, SignedConsensusMsg, VoteExtensionError, VoteSyncMode,
};
use malachitebft_core_types::{
Context, Round, SigningProvider, SigningProviderExt, Timeout, TimeoutKind, ValidatorSet,
Expand Down Expand Up @@ -976,10 +976,9 @@ where
}

Effect::Rebroadcast(msg, r) => {
// Rebroadcast last vote only if sync is not enabled, otherwise vote set requests are issued.
// TODO - there is currently no easy access to the sync configuration. In addition there is
// a single configuration for both value and vote sync.
if self.sync.is_none() {
// Rebroadcast last vote only if vote sync mode is set to "rebroadcast",
// otherwise vote set requests are issued automatically by the sync protocol.
if self.params.vote_sync_mode == VoteSyncMode::Rebroadcast {
// Notify any subscribers that we are about to rebroadcast a message
self.tx_event.send(|| Event::Rebroadcast(msg.clone()));

Expand Down Expand Up @@ -1047,7 +1046,7 @@ where
Ok(r.resume_with(()))
}

Effect::GetVoteSet(height, round, r) => {
Effect::RequestVoteSet(height, round, r) => {
if let Some(sync) = &self.sync {
debug!(%height, %round, "Request sync to obtain the vote set from peers");

Expand Down
16 changes: 12 additions & 4 deletions code/crates/starknet/host/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use tokio::task::JoinHandle;
use tracing::warn;

use malachitebft_config::{
self as config, Config as NodeConfig, MempoolConfig, SyncConfig, TestConfig, TransportProtocol,
self as config, Config as NodeConfig, MempoolConfig, TestConfig, TransportProtocol,
ValueSyncConfig, VoteSyncConfig,
};
use malachitebft_core_consensus::VoteSyncMode;
use malachitebft_core_types::ValuePayload;
use malachitebft_engine::consensus::{Consensus, ConsensusParams, ConsensusRef};
use malachitebft_engine::host::HostRef;
Expand Down Expand Up @@ -73,7 +75,8 @@ pub async fn spawn_node_actor(
ctx,
network.clone(),
host.clone(),
&cfg.sync,
&cfg.value_sync,
&cfg.consensus.vote_sync,
&registry,
&span,
)
Expand Down Expand Up @@ -127,11 +130,12 @@ async fn spawn_sync_actor(
ctx: MockContext,
network: NetworkRef<MockContext>,
host: HostRef<MockContext>,
config: &SyncConfig,
config: &ValueSyncConfig,
vote_sync: &VoteSyncConfig,
registry: &SharedRegistry,
span: &tracing::Span,
) -> Option<SyncRef<MockContext>> {
if !config.enabled {
if !config.enabled && vote_sync.mode != config::VoteSyncMode::RequestResponse {
return None;
}

Expand Down Expand Up @@ -170,6 +174,10 @@ async fn spawn_consensus_actor(
address,
threshold_params: Default::default(),
value_payload: ValuePayload::PartsOnly,
vote_sync_mode: match cfg.consensus.vote_sync.mode {
config::VoteSyncMode::RequestResponse => VoteSyncMode::RequestResponse,
config::VoteSyncMode::Rebroadcast => VoteSyncMode::Rebroadcast,
},
};

Consensus::spawn(
Expand Down
5 changes: 4 additions & 1 deletion code/crates/starknet/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ impl TestRunner {
moniker: format!("node-{}", node),
logging: LoggingConfig::default(),
consensus: ConsensusConfig {
vote_sync: VoteSyncConfig {
mode: VoteSyncMode::Rebroadcast,
},
timeouts: TimeoutConfig::default(),
p2p: P2pConfig {
transport,
Expand Down Expand Up @@ -168,7 +171,7 @@ impl TestRunner {
max_tx_count: 10000,
gossip_batch_size: 100,
},
sync: SyncConfig {
value_sync: ValueSyncConfig {
enabled: true,
status_update_interval: Duration::from_secs(2),
request_timeout: Duration::from_secs(5),
Expand Down
6 changes: 3 additions & 3 deletions code/crates/starknet/test/src/tests/full_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub async fn full_node_crash_and_sync() {
.run_with_params(
Duration::from_secs(60),
TestParams {
enable_sync: true,
enable_value_sync: true,
..Default::default()
},
)
Expand Down Expand Up @@ -121,7 +121,7 @@ pub async fn late_starting_full_node() {
.run_with_params(
Duration::from_secs(60),
TestParams {
enable_sync: true,
enable_value_sync: true,
..Default::default()
},
)
Expand Down Expand Up @@ -175,7 +175,7 @@ pub async fn mixed_validator_and_full_node_failures() {
.run_with_params(
Duration::from_secs(60),
TestParams {
enable_sync: true,
enable_value_sync: true,
..Default::default()
},
)
Expand Down
Loading

0 comments on commit ad7350e

Please sign in to comment.