From 4fd152af37370a412eb49bb0068ec4e41aa78f05 Mon Sep 17 00:00:00 2001 From: Tumas Date: Thu, 21 Nov 2024 15:19:49 +0200 Subject: [PATCH 1/7] Add missing gossipsub scoring parameter update --- p2p/src/messages.rs | 1 + p2p/src/network.rs | 30 ++++++++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/p2p/src/messages.rs b/p2p/src/messages.rs index 377c8324..ca1662df 100644 --- a/p2p/src/messages.rs +++ b/p2p/src/messages.rs @@ -204,6 +204,7 @@ pub enum ServiceInboundMessage { UnsubscribeFromForkTopicsExcept(ForkDigest), UpdateEnrSubnet(Subnet, bool), UpdateForkVersion(EnrForkId), + UpdateGossipsubParameters(u64, Slot), } impl ServiceInboundMessage

{ diff --git a/p2p/src/network.rs b/p2p/src/network.rs index 17d3fd63..ade0b763 100644 --- a/p2p/src/network.rs +++ b/p2p/src/network.rs @@ -29,7 +29,7 @@ use futures::{ select, stream::StreamExt as _, }; -use helper_functions::misc; +use helper_functions::{accessors, misc}; use log::{debug, error, info, trace, warn}; use logging::PEER_LOG_METRICS; use operation_pools::{BlsToExecutionChangePool, Origin, PoolToP2pMessage, SyncCommitteeAggPool}; @@ -39,12 +39,13 @@ use slog::{o, Drain as _, Logger}; use slog_stdlog::StdLog; use std_ext::ArcExt as _; use thiserror::Error; +use tokio_stream::wrappers::IntervalStream; use types::{ altair::containers::{SignedContributionAndProof, SyncCommitteeMessage}, capella::containers::SignedBlsToExecutionChange, combined::{Attestation, AttesterSlashing, SignedAggregateAndProof, SignedBeaconBlock}, deneb::containers::{BlobIdentifier, BlobSidecar}, - nonstandard::{Phase, WithStatus}, + nonstandard::{Phase, RelativeEpoch, WithStatus}, phase0::{ consts::{FAR_FUTURE_EPOCH, GENESIS_EPOCH}, containers::{ProposerSlashing, SignedVoluntaryExit}, @@ -63,6 +64,7 @@ use crate::{ upnp::PortMappings, }; +const GOSSIPSUB_PARAMETER_UPDATE_INTERVAL: Duration = Duration::from_secs(60); const MAX_FOR_DOS_PREVENTION: u64 = 64; /// Number of slots before a new phase to subscribe to its topics. @@ -201,8 +203,15 @@ impl Network

{ #[allow(clippy::too_many_lines)] pub async fn run(mut self) -> Result { + let mut gossipsub_parameter_update_interval = + IntervalStream::new(tokio::time::interval(GOSSIPSUB_PARAMETER_UPDATE_INTERVAL)).fuse(); + loop { select! { + _ = gossipsub_parameter_update_interval.select_next_some() => { + self.update_gossipsub_parameters(); + }, + message = self.service_to_network_rx.select_next_some() => { match message { ServiceOutboundMessage::NetworkEvent(network_event) => { @@ -718,6 +727,15 @@ impl Network

{ } } + fn update_gossipsub_parameters(&self) { + let head_state = self.controller.head_state().value(); + let active_validator_count = + accessors::active_validator_count_u64(&head_state, RelativeEpoch::Current); + + ServiceInboundMessage::UpdateGossipsubParameters(active_validator_count, head_state.slot()) + .send(&self.network_to_service_tx); + } + fn update_sync_committee_subnets( &self, actions: BTreeMap, @@ -1879,6 +1897,14 @@ fn run_network_service( ServiceInboundMessage::UpdateForkVersion(enr_fork_id) => { service.update_fork_version(enr_fork_id); } + ServiceInboundMessage::UpdateGossipsubParameters(active_validator_count, slot) => { + if let Err(error) = service.update_gossipsub_parameters( + active_validator_count, + slot + ) { + warn!("unable to update gossipsub scoring parameters: {error:?}"); + } + } } } } From 94f3a448552109101edd2fddc3e31432b3ff59ff Mon Sep 17 00:00:00 2001 From: Tumas Date: Thu, 21 Nov 2024 16:06:01 +0200 Subject: [PATCH 2/7] Remove old gossipsub topic weights when switching to a new fork --- p2p/src/messages.rs | 2 +- p2p/src/network.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/p2p/src/messages.rs b/p2p/src/messages.rs index ca1662df..0e6ed7d3 100644 --- a/p2p/src/messages.rs +++ b/p2p/src/messages.rs @@ -203,7 +203,7 @@ pub enum ServiceInboundMessage { Unsubscribe(GossipTopic), UnsubscribeFromForkTopicsExcept(ForkDigest), UpdateEnrSubnet(Subnet, bool), - UpdateForkVersion(EnrForkId), + UpdateFork(EnrForkId), UpdateGossipsubParameters(u64, Slot), } diff --git a/p2p/src/network.rs b/p2p/src/network.rs index ade0b763..d2936d25 100644 --- a/p2p/src/network.rs +++ b/p2p/src/network.rs @@ -457,8 +457,7 @@ impl Network

{ let new_enr_fork_id = Self::enr_fork_id(&self.controller, &self.fork_context, slot); - ServiceInboundMessage::UpdateForkVersion(new_enr_fork_id) - .send(&self.network_to_service_tx); + ServiceInboundMessage::UpdateFork(new_enr_fork_id).send(&self.network_to_service_tx); } // Subscribe to the topics of the next phase. @@ -1894,8 +1893,9 @@ fn run_network_service( ServiceInboundMessage::UpdateEnrSubnet(subnet, advertise) => { service.update_enr_subnet(subnet, advertise); } - ServiceInboundMessage::UpdateForkVersion(enr_fork_id) => { + ServiceInboundMessage::UpdateFork(enr_fork_id) => { service.update_fork_version(enr_fork_id); + service.remove_topic_weight_except(enr_fork_id.fork_digest); } ServiceInboundMessage::UpdateGossipsubParameters(active_validator_count, slot) => { if let Err(error) = service.update_gossipsub_parameters( From 196b257520bca5e5d25863eb20ea4be0a72cca08 Mon Sep 17 00:00:00 2001 From: Povilas Liubauskas Date: Wed, 27 Nov 2024 14:49:31 +0200 Subject: [PATCH 3/7] Do not produce beacon block when there is no execution payload available for inclusion --- block_producer/src/block_producer.rs | 64 ++++++++++++++++++++-------- block_producer/src/lib.rs | 2 +- http_api/src/context.rs | 5 ++- runtime/src/runtime.rs | 1 + 4 files changed, 53 insertions(+), 19 deletions(-) diff --git a/block_producer/src/block_producer.rs b/block_producer/src/block_producer.rs index bd9ddd3d..381ac012 100644 --- a/block_producer/src/block_producer.rs +++ b/block_producer/src/block_producer.rs @@ -101,6 +101,11 @@ pub type ExecutionPayloadHeaderJoinHandle

= JoinHandle = JoinHandle, P>>>; +#[derive(Default)] +pub struct Options { + pub fake_execution_payloads: bool, +} + pub struct BlockProducer { producer_context: Arc>, } @@ -118,7 +123,12 @@ impl BlockProducer { bls_to_execution_change_pool: Arc, sync_committee_agg_pool: Arc>, metrics: Option>, + options: Option, ) -> Self { + let Options { + fake_execution_payloads, + } = options.unwrap_or_default(); + let producer_context = Arc::new(ProducerContext { chain_config: controller.chain_config().clone_arc(), proposer_configs, @@ -137,6 +147,7 @@ impl BlockProducer { payload_cache: Mutex::new(SizedCache::with_size(PAYLOAD_CACHE_SIZE)), payload_id_cache: Mutex::new(SizedCache::with_size(PAYLOAD_ID_CACHE_SIZE)), metrics, + fake_execution_payloads, }); Self { producer_context } @@ -574,6 +585,7 @@ struct ProducerContext { payload_cache: Mutex, P>>>, payload_id_cache: Mutex>, metrics: Option>, + fake_execution_payloads: bool, } #[derive(Clone, Copy, Default)] @@ -1019,29 +1031,24 @@ impl BlockBuildContext { }; let WithBlobsAndMev { - value: mut execution_payload, + value: execution_payload, commitments, proofs, blobs, mev, execution_requests, - } = with_blobs_and_mev.unwrap_or_else(|| WithBlobsAndMev::with_default(None)); - - let slot = self.beacon_state.slot(); + } = match with_blobs_and_mev { + Some(payload_with_mev) => payload_with_mev, + None => { + if self.beacon_state.post_capella().is_some() + || post_merge_state(&self.beacon_state).is_some() + { + return Err(AnyhowError::msg("no execution payload to include in block")); + } - // Starting with Capella, all blocks must be post-Merge. - // Construct a superficially valid execution payload for snapshot testing. - // It will almost always be invalid in a real network, but so would a default payload. - // Construct the payload with a fictitious `ExecutionBlockHash` derived from the slot. - // Computing the real `ExecutionBlockHash` would make maintaining tests much harder. - if self.beacon_state.phase() >= Phase::Capella && execution_payload.is_none() { - execution_payload = Some(factory::execution_payload( - &self.producer_context.chain_config, - &self.beacon_state, - slot, - ExecutionBlockHash::from_low_u64_be(slot), - )?); - } + WithBlobsAndMev::with_default(None) + } + }; let without_state_root_with_payload = block_without_state_root .with_execution_payload(execution_payload)? @@ -1703,6 +1710,29 @@ impl BlockBuildContext { async fn local_execution_payload_option( &self, ) -> Option, P>> { + if self.producer_context.fake_execution_payloads { + let slot = self.beacon_state.slot(); + + // Starting with Capella, all blocks must be post-Merge. + // Construct a superficially valid execution payload for snapshot testing. + // It will almost always be invalid in a real network, but so would a default payload. + // Construct the payload with a fictitious `ExecutionBlockHash` derived from the slot. + // Computing the real `ExecutionBlockHash` would make maintaining tests much harder. + if self.beacon_state.phase() >= Phase::Capella { + let execution_payload = factory::execution_payload( + &self.producer_context.chain_config, + &self.beacon_state, + slot, + ExecutionBlockHash::from_low_u64_be(slot), + ); + + match execution_payload { + Ok(payload) => return Some(WithBlobsAndMev::with_default(payload)), + Err(error) => panic!("failed to produce fake payload: {error:?}"), + }; + } + } + let _timer = self .producer_context .metrics diff --git a/block_producer/src/lib.rs b/block_producer/src/lib.rs index 311cfb4f..18ef37ec 100644 --- a/block_producer/src/lib.rs +++ b/block_producer/src/lib.rs @@ -1,4 +1,4 @@ -pub use block_producer::{BlockBuildOptions, BlockProducer}; +pub use block_producer::{BlockBuildOptions, BlockProducer, Options}; pub use misc::{ProposerData, ValidatorBlindedBlock}; mod block_producer; diff --git a/http_api/src/context.rs b/http_api/src/context.rs index d4a63b72..bbef5d4b 100644 --- a/http_api/src/context.rs +++ b/http_api/src/context.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::Result; use attestation_verifier::AttestationVerifier; -use block_producer::BlockProducer; +use block_producer::{BlockProducer, Options as BlockProducerOptions}; use bls::{PublicKeyBytes, SecretKey}; use clock::Tick; use database::Database; @@ -284,6 +284,9 @@ impl Context

{ bls_to_execution_change_pool.clone_arc(), sync_committee_agg_pool.clone_arc(), None, + Some(BlockProducerOptions { + fake_execution_payloads: true, + }), )); let validator_channels = ValidatorChannels { diff --git a/runtime/src/runtime.rs b/runtime/src/runtime.rs index 4c9e36b6..693114b1 100644 --- a/runtime/src/runtime.rs +++ b/runtime/src/runtime.rs @@ -501,6 +501,7 @@ pub async fn run_after_genesis( bls_to_execution_change_pool.clone_arc(), sync_committee_agg_pool.clone_arc(), metrics.clone(), + None, )); let validator_channels = ValidatorChannels { From 3e194ed13b9700311248f4376e3844a64e5490b4 Mon Sep 17 00:00:00 2001 From: Povilas Liubauskas Date: Wed, 27 Nov 2024 17:07:09 +0200 Subject: [PATCH 4/7] Feature gate `prometheus_metrics` dependency in `helper_functions` and `transition_functions`. App will compile and run with metrics feature enabled in `*_functions`, since `runtime` requires `metrics` module, and it itself require `*_functions` with metrics feature. --- helper_functions/Cargo.toml | 5 ++++- helper_functions/src/accessors.rs | 9 ++++++++- metrics/Cargo.toml | 4 ++-- transition_functions/Cargo.toml | 5 ++++- transition_functions/src/altair/block_processing.rs | 5 ++++- transition_functions/src/altair/epoch_processing.rs | 5 ++++- transition_functions/src/bellatrix/block_processing.rs | 5 ++++- transition_functions/src/bellatrix/epoch_processing.rs | 5 ++++- .../src/capella/blinded_block_processing.rs | 5 ++++- transition_functions/src/capella/block_processing.rs | 5 ++++- transition_functions/src/capella/epoch_processing.rs | 5 ++++- transition_functions/src/combined.rs | 5 ++++- .../src/deneb/blinded_block_processing.rs | 5 ++++- transition_functions/src/deneb/block_processing.rs | 5 ++++- transition_functions/src/deneb/epoch_processing.rs | 5 ++++- .../src/electra/blinded_block_processing.rs | 5 ++++- transition_functions/src/electra/block_processing.rs | 5 ++++- transition_functions/src/electra/epoch_processing.rs | 5 ++++- transition_functions/src/phase0/block_processing.rs | 5 ++++- transition_functions/src/phase0/epoch_processing.rs | 5 ++++- 20 files changed, 82 insertions(+), 21 deletions(-) diff --git a/helper_functions/Cargo.toml b/helper_functions/Cargo.toml index d0111687..397fc7da 100644 --- a/helper_functions/Cargo.toml +++ b/helper_functions/Cargo.toml @@ -19,7 +19,7 @@ im = { workspace = true } itertools = { workspace = true } num-integer = { workspace = true } parse-display = { workspace = true } -prometheus_metrics = { workspace = true } +prometheus_metrics = { workspace = true, optional = true } rayon = { workspace = true } rc-box = { workspace = true } serde = { workspace = true } @@ -41,3 +41,6 @@ nonzero_ext = { workspace = true } spec_test_utils = { workspace = true } test-case = { workspace = true } test-generator = { workspace = true } + +[features] +metrics = ['prometheus_metrics'] diff --git a/helper_functions/src/accessors.rs b/helper_functions/src/accessors.rs index 98720751..f9805fde 100644 --- a/helper_functions/src/accessors.rs +++ b/helper_functions/src/accessors.rs @@ -12,7 +12,6 @@ use bls::{AggregatePublicKey, CachedPublicKey, PublicKeyBytes}; use im::HashMap; use itertools::{EitherOrBoth, Itertools as _}; use num_integer::Roots as _; -use prometheus_metrics::METRICS; use rc_box::ArcBox; use ssz::{ContiguousVector, FitsInU64, Hc, SszHash as _}; use std_ext::CopyExt as _; @@ -47,6 +46,9 @@ use types::{ use crate::{error::Error, misc, predicates}; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + #[must_use] pub fn get_previous_epoch(state: &impl BeaconState

) -> Epoch { get_current_epoch(state) @@ -197,6 +199,7 @@ pub fn get_or_init_validator_indices( ) -> &HashMap { state.cache().validator_indices.get_or_init(|| { if report_cache_miss { + #[cfg(feature = "metrics")] if let Some(metrics) = METRICS.get() { metrics.validator_indices_init_count.inc(); } @@ -264,6 +267,7 @@ pub fn get_or_init_active_validator_indices_ordered( state.cache().active_validator_indices_ordered[relative_epoch].get_or_init(|| { if report_cache_miss { + #[cfg(feature = "metrics")] if let Some(metrics) = METRICS.get() { metrics.active_validator_indices_ordered_init_count.inc(); } @@ -316,6 +320,7 @@ where state.cache().active_validator_indices_shuffled[relative_epoch].get_or_init(|| { if report_cache_miss { + #[cfg(feature = "metrics")] if let Some(metrics) = METRICS.get() { metrics.active_validator_indices_shuffled_init_count.inc(); } @@ -463,6 +468,7 @@ pub fn get_or_try_init_beacon_proposer_index( .proposer_index .get_or_try_init(|| { if report_cache_miss { + #[cfg(feature = "metrics")] if let Some(metrics) = METRICS.get() { metrics.beacon_proposer_index_init_count.inc(); } @@ -527,6 +533,7 @@ pub fn get_or_init_total_active_balance( state.cache().total_active_balance[RelativeEpoch::Current] .get_or_init(|| { if report_cache_miss { + #[cfg(feature = "metrics")] if let Some(metrics) = METRICS.get() { metrics.total_active_balance_init_count.inc(); } diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index 69be48e3..c4fb4421 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -18,7 +18,7 @@ eth2_libp2p = { workspace = true } fork_choice_control = { workspace = true } futures = { workspace = true } grandine_version = { workspace = true } -helper_functions = { workspace = true } +helper_functions = { workspace = true, features = ['metrics'] } http_api_utils = { workspace = true } jemalloc-ctl = { workspace = true } log = { workspace = true } @@ -37,7 +37,7 @@ thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } tower-http = { workspace = true } -transition_functions = { workspace = true } +transition_functions = { workspace = true, features = ['metrics'] } types = { workspace = true } [dev-dependencies] diff --git a/transition_functions/Cargo.toml b/transition_functions/Cargo.toml index 49a43481..5956963c 100644 --- a/transition_functions/Cargo.toml +++ b/transition_functions/Cargo.toml @@ -18,7 +18,7 @@ hashing = { workspace = true } helper_functions = { workspace = true } itertools = { workspace = true } num-integer = { workspace = true } -prometheus_metrics = { workspace = true } +prometheus_metrics = { workspace = true, optional = true } rayon = { workspace = true } serde = { workspace = true } ssz = { workspace = true } @@ -35,3 +35,6 @@ unwrap_none = { workspace = true } duplicate = { workspace = true } spec_test_utils = { workspace = true } test-generator = { workspace = true } + +[features] +metrics = ['helper_functions/metrics', 'prometheus_metrics'] diff --git a/transition_functions/src/altair/block_processing.rs b/transition_functions/src/altair/block_processing.rs index 6544189a..f817fa24 100644 --- a/transition_functions/src/altair/block_processing.rs +++ b/transition_functions/src/altair/block_processing.rs @@ -15,7 +15,6 @@ use helper_functions::{ slot_report::{Delta, NullSlotReport, SlotReport, SyncAggregateRewards}, verifier::{SingleVerifier, Triple, Verifier}, }; -use prometheus_metrics::METRICS; use rayon::iter::{IntoParallelRefIterator as _, ParallelIterator as _}; use std_ext::ArcExt as _; use typenum::Unsigned as _; @@ -47,6 +46,9 @@ use crate::{ unphased::{self, CombinedDeposit, Error}, }; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + /// /// /// This also serves as a substitute for [`compute_new_state_root`]. `compute_new_state_root` as @@ -65,6 +67,7 @@ pub fn process_block( mut verifier: impl Verifier, slot_report: impl SlotReport, ) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.block_transition_times.start_timer()); diff --git a/transition_functions/src/altair/epoch_processing.rs b/transition_functions/src/altair/epoch_processing.rs index e9cd5c9f..286bafb9 100644 --- a/transition_functions/src/altair/epoch_processing.rs +++ b/transition_functions/src/altair/epoch_processing.rs @@ -9,7 +9,6 @@ use helper_functions::{ mutators::decrease_balance, predicates::is_in_inactivity_leak, }; -use prometheus_metrics::METRICS; use ssz::PersistentList; use typenum::Unsigned as _; use types::{ @@ -29,6 +28,9 @@ use super::epoch_intermediates::{ }; use crate::unphased::{self, SlashingPenalties}; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + pub struct EpochReport { pub statistics: Statistics, pub summaries: Vec, @@ -38,6 +40,7 @@ pub struct EpochReport { } pub fn process_epoch(config: &Config, state: &mut AltairBeaconState) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.epoch_processing_times.start_timer()); diff --git a/transition_functions/src/bellatrix/block_processing.rs b/transition_functions/src/bellatrix/block_processing.rs index 5cb1e920..44088e30 100644 --- a/transition_functions/src/bellatrix/block_processing.rs +++ b/transition_functions/src/bellatrix/block_processing.rs @@ -10,7 +10,6 @@ use helper_functions::{ slot_report::SlotReport, verifier::{SingleVerifier, Triple, Verifier}, }; -use prometheus_metrics::METRICS; use rayon::iter::{IntoParallelRefIterator as _, ParallelIterator as _}; use ssz::SszHash as _; use typenum::Unsigned as _; @@ -34,6 +33,9 @@ use crate::{ unphased::{self, Error}, }; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + /// /// /// This also serves as a substitute for [`compute_new_state_root`]. `compute_new_state_root` as @@ -52,6 +54,7 @@ pub fn process_block( mut verifier: impl Verifier, slot_report: impl SlotReport, ) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.block_transition_times.start_timer()); diff --git a/transition_functions/src/bellatrix/epoch_processing.rs b/transition_functions/src/bellatrix/epoch_processing.rs index 0c161346..2b93e56a 100644 --- a/transition_functions/src/bellatrix/epoch_processing.rs +++ b/transition_functions/src/bellatrix/epoch_processing.rs @@ -6,7 +6,6 @@ use helper_functions::{ misc::vec_of_default, mutators::decrease_balance, }; -use prometheus_metrics::METRICS; use typenum::Unsigned as _; use types::{ bellatrix::beacon_state::BeaconState as CapellaBeaconState, config::Config, @@ -19,7 +18,11 @@ use crate::{ unphased::{self, SlashingPenalties}, }; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + pub fn process_epoch(config: &Config, state: &mut CapellaBeaconState) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.epoch_processing_times.start_timer()); diff --git a/transition_functions/src/capella/blinded_block_processing.rs b/transition_functions/src/capella/blinded_block_processing.rs index a1290aa6..d4ec6e73 100644 --- a/transition_functions/src/capella/blinded_block_processing.rs +++ b/transition_functions/src/capella/blinded_block_processing.rs @@ -7,7 +7,6 @@ use helper_functions::{ slot_report::SlotReport, verifier::Verifier, }; -use prometheus_metrics::METRICS; use ssz::{ContiguousList, SszHash as _}; use tap::TryConv as _; use typenum::{NonZero, Unsigned as _}; @@ -27,6 +26,9 @@ use crate::{ unphased::{self, Error}, }; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + pub fn custom_process_blinded_block( config: &Config, state: &mut BeaconState

, @@ -34,6 +36,7 @@ pub fn custom_process_blinded_block( mut verifier: impl Verifier, mut slot_report: impl SlotReport, ) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.blinded_block_transition_times.start_timer()); diff --git a/transition_functions/src/capella/block_processing.rs b/transition_functions/src/capella/block_processing.rs index 5d360503..bf9cd479 100644 --- a/transition_functions/src/capella/block_processing.rs +++ b/transition_functions/src/capella/block_processing.rs @@ -13,7 +13,6 @@ use helper_functions::{ verifier::{SingleVerifier, Triple, Verifier}, }; use itertools::izip; -use prometheus_metrics::METRICS; use rayon::iter::{IntoParallelRefIterator as _, ParallelIterator as _}; use ssz::SszHash as _; use tap::Pipe as _; @@ -40,6 +39,9 @@ use crate::{ unphased::{self, Error}, }; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + /// [`process_block`](https://github.com/ethereum/consensus-specs/blob/0b76c8367ed19014d104e3fbd4718e73f459a748/specs/capella/beacon-chain.md#block-processing) /// /// This also serves as a substitute for [`compute_new_state_root`]. `compute_new_state_root` as @@ -58,6 +60,7 @@ pub fn process_block( mut verifier: impl Verifier, slot_report: impl SlotReport, ) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.block_transition_times.start_timer()); diff --git a/transition_functions/src/capella/epoch_processing.rs b/transition_functions/src/capella/epoch_processing.rs index e16193a4..2732d0a0 100644 --- a/transition_functions/src/capella/epoch_processing.rs +++ b/transition_functions/src/capella/epoch_processing.rs @@ -1,7 +1,6 @@ use anyhow::Result; use arithmetic::{NonZeroExt as _, U64Ext as _}; use helper_functions::{accessors::get_next_epoch, misc::vec_of_default}; -use prometheus_metrics::METRICS; use ssz::SszHash as _; use types::{ capella::{beacon_state::BeaconState, containers::HistoricalSummary}, @@ -16,7 +15,11 @@ use crate::{ bellatrix, unphased, }; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + pub fn process_epoch(config: &Config, state: &mut BeaconState) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.epoch_processing_times.start_timer()); diff --git a/transition_functions/src/combined.rs b/transition_functions/src/combined.rs index 01571774..1d5c2b7c 100644 --- a/transition_functions/src/combined.rs +++ b/transition_functions/src/combined.rs @@ -7,7 +7,6 @@ use helper_functions::{ slot_report::{NullSlotReport, RealSlotReport, SlotReport}, verifier::{MultiVerifier, NullVerifier, Verifier, VerifierOption}, }; -use prometheus_metrics::METRICS; use static_assertions::const_assert_eq; use thiserror::Error; use types::{ @@ -31,6 +30,9 @@ use crate::{ unphased::{self, Error, ProcessSlots, StateRootPolicy}, }; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + #[derive(From)] pub enum EpochReport { Phase0(Phase0EpochReport), @@ -247,6 +249,7 @@ pub fn process_slots( }, ); + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.process_slot_times.start_timer()); diff --git a/transition_functions/src/deneb/blinded_block_processing.rs b/transition_functions/src/deneb/blinded_block_processing.rs index aaf5fb0b..a3ed37b1 100644 --- a/transition_functions/src/deneb/blinded_block_processing.rs +++ b/transition_functions/src/deneb/blinded_block_processing.rs @@ -1,6 +1,5 @@ use anyhow::{ensure, Result}; use helper_functions::{accessors, misc, slot_report::SlotReport, verifier::Verifier}; -use prometheus_metrics::METRICS; use types::{ config::Config, deneb::{ @@ -16,6 +15,9 @@ use crate::{ unphased::{self, Error}, }; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + pub fn custom_process_blinded_block( config: &Config, state: &mut BeaconState

, @@ -23,6 +25,7 @@ pub fn custom_process_blinded_block( mut verifier: impl Verifier, mut slot_report: impl SlotReport, ) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.blinded_block_transition_times.start_timer()); diff --git a/transition_functions/src/deneb/block_processing.rs b/transition_functions/src/deneb/block_processing.rs index 3a4ccd2b..f6d58ecf 100644 --- a/transition_functions/src/deneb/block_processing.rs +++ b/transition_functions/src/deneb/block_processing.rs @@ -12,7 +12,6 @@ use helper_functions::{ slot_report::SlotReport, verifier::{SingleVerifier, Triple, Verifier}, }; -use prometheus_metrics::METRICS; use rayon::iter::{IntoParallelRefIterator as _, ParallelIterator as _}; use ssz::SszHash as _; use typenum::Unsigned as _; @@ -39,6 +38,9 @@ use crate::{ unphased::{self, Error}, }; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + /// [`process_block`](TODO(feature/deneb)) /// /// This also serves as a substitute for [`compute_new_state_root`]. `compute_new_state_root` as @@ -57,6 +59,7 @@ pub fn process_block( mut verifier: impl Verifier, slot_report: impl SlotReport, ) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.block_transition_times.start_timer()); diff --git a/transition_functions/src/deneb/epoch_processing.rs b/transition_functions/src/deneb/epoch_processing.rs index 08cfecb3..a7320a52 100644 --- a/transition_functions/src/deneb/epoch_processing.rs +++ b/transition_functions/src/deneb/epoch_processing.rs @@ -8,7 +8,6 @@ use helper_functions::{ predicates::{is_active_validator, is_eligible_for_activation}, }; use itertools::Itertools as _; -use prometheus_metrics::METRICS; use ssz::SszHash as _; use types::{ capella::containers::HistoricalSummary, config::Config, deneb::beacon_state::BeaconState, @@ -22,7 +21,11 @@ use crate::{ unphased::ValidatorSummary, }; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + pub fn process_epoch(config: &Config, state: &mut BeaconState) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.epoch_processing_times.start_timer()); diff --git a/transition_functions/src/electra/blinded_block_processing.rs b/transition_functions/src/electra/blinded_block_processing.rs index e29647fc..59bc08b6 100644 --- a/transition_functions/src/electra/blinded_block_processing.rs +++ b/transition_functions/src/electra/blinded_block_processing.rs @@ -1,6 +1,5 @@ use anyhow::{ensure, Result}; use helper_functions::{accessors, misc, slot_report::SlotReport, verifier::Verifier}; -use prometheus_metrics::METRICS; use types::{ config::Config, electra::{ @@ -16,6 +15,9 @@ use crate::{ unphased::{self, Error}, }; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + pub fn custom_process_blinded_block( config: &Config, state: &mut BeaconState

, @@ -23,6 +25,7 @@ pub fn custom_process_blinded_block( mut verifier: impl Verifier, mut slot_report: impl SlotReport, ) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.blinded_block_transition_times.start_timer()); diff --git a/transition_functions/src/electra/block_processing.rs b/transition_functions/src/electra/block_processing.rs index c4ee56e0..a2a86839 100644 --- a/transition_functions/src/electra/block_processing.rs +++ b/transition_functions/src/electra/block_processing.rs @@ -35,7 +35,6 @@ use helper_functions::{ verifier::{SingleVerifier, Triple, Verifier}, }; use itertools::izip; -use prometheus_metrics::METRICS; use rayon::iter::{IntoParallelRefIterator as _, ParallelIterator as _}; use ssz::{PersistentList, SszHash as _}; use tap::Pipe as _; @@ -77,6 +76,9 @@ use crate::{ unphased::{self, CombinedDeposit, Error}, }; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + /// [`process_block`](TODO(feature/electra)) /// /// This also serves as a substitute for [`compute_new_state_root`]. `compute_new_state_root` as @@ -95,6 +97,7 @@ pub fn process_block( mut verifier: impl Verifier, slot_report: impl SlotReport, ) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.block_transition_times.start_timer()); diff --git a/transition_functions/src/electra/epoch_processing.rs b/transition_functions/src/electra/epoch_processing.rs index 3b3f57a2..863ea331 100644 --- a/transition_functions/src/electra/epoch_processing.rs +++ b/transition_functions/src/electra/epoch_processing.rs @@ -16,7 +16,6 @@ use helper_functions::{ predicates::{is_active_validator, is_eligible_for_activation}, signing::SignForAllForks as _, }; -use prometheus_metrics::METRICS; use ssz::{PersistentList, SszHash as _}; use try_from_iterator::TryFromIterator as _; use typenum::Unsigned as _; @@ -42,7 +41,11 @@ use crate::{ unphased::{SlashingPenalties, ValidatorSummary}, }; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + pub fn process_epoch(config: &Config, state: &mut ElectraBeaconState) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.epoch_processing_times.start_timer()); diff --git a/transition_functions/src/phase0/block_processing.rs b/transition_functions/src/phase0/block_processing.rs index 30ad9617..f81b87fd 100644 --- a/transition_functions/src/phase0/block_processing.rs +++ b/transition_functions/src/phase0/block_processing.rs @@ -12,7 +12,6 @@ use helper_functions::{ slot_report::{NullSlotReport, SlotReport}, verifier::{SingleVerifier, Triple, Verifier}, }; -use prometheus_metrics::METRICS; use rayon::iter::{IntoParallelRefIterator as _, ParallelIterator as _}; use typenum::Unsigned as _; use types::{ @@ -34,6 +33,9 @@ use types::{ use crate::unphased::{self, CombinedDeposit, Error}; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + /// /// /// This also serves as a substitute for [`compute_new_state_root`]. `compute_new_state_root` as @@ -52,6 +54,7 @@ pub fn process_block( mut verifier: impl Verifier, slot_report: impl SlotReport, ) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.block_transition_times.start_timer()); diff --git a/transition_functions/src/phase0/epoch_processing.rs b/transition_functions/src/phase0/epoch_processing.rs index c8c23511..3883eb2e 100644 --- a/transition_functions/src/phase0/epoch_processing.rs +++ b/transition_functions/src/phase0/epoch_processing.rs @@ -5,7 +5,6 @@ use anyhow::Result; use helper_functions::{ accessors::get_current_epoch, misc::vec_of_default, mutators::decrease_balance, }; -use prometheus_metrics::METRICS; use typenum::Unsigned as _; use types::{ config::Config, @@ -22,6 +21,9 @@ use super::epoch_intermediates::{ }; use crate::unphased::{self, SlashingPenalties}; +#[cfg(feature = "metrics")] +use prometheus_metrics::METRICS; + pub struct EpochReport { pub statistics: StatisticsForReport, pub summaries: Vec, @@ -32,6 +34,7 @@ pub struct EpochReport { } pub fn process_epoch(config: &Config, state: &mut BeaconState) -> Result<()> { + #[cfg(feature = "metrics")] let _timer = METRICS .get() .map(|metrics| metrics.epoch_processing_times.start_timer()); From 1b3c9819ce99eda1b6c89e32213fc616b4722c2e Mon Sep 17 00:00:00 2001 From: Tumas Date: Thu, 28 Nov 2024 12:33:23 +0200 Subject: [PATCH 5/7] Run slashing protection db in an EXCLUSIVE locking mode --- slashing_protection/src/lib.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/slashing_protection/src/lib.rs b/slashing_protection/src/lib.rs index 3a456536..ed791e63 100644 --- a/slashing_protection/src/lib.rs +++ b/slashing_protection/src/lib.rs @@ -200,6 +200,10 @@ impl SlashingProtector { // See . connection.pragma_update(None, "cache_size", -20000)?; + // Prevent other processes from accessing the database file. + // See . + connection.pragma_update(None, "locking_mode", "EXCLUSIVE")?; + Ok(()) } @@ -1133,8 +1137,15 @@ mod tests { |row| row.get::<_, i64>(0), )?; + let locking_mode = slashing_protector.connection.query_row( + "SELECT locking_mode FROM pragma_locking_mode", + (), + |row| row.get::<_, String>(0), + )?; + assert!(foreign_keys); assert_eq!(cache_size, -20000); + assert_eq!(locking_mode, "exclusive"); Ok(()) } From 6ff617dcbdcb83b3a8ce5684c34c4f735c202780 Mon Sep 17 00:00:00 2001 From: Tumas Date: Thu, 28 Nov 2024 12:33:23 +0200 Subject: [PATCH 6/7] Add extra print info about exported slashing protection interchange records --- grandine/src/main.rs | 25 +++++++- slashing_protection/src/interchange_format.rs | 58 +++++++++++++++++++ slashing_protection/src/lib.rs | 4 +- 3 files changed, 83 insertions(+), 4 deletions(-) diff --git a/grandine/src/main.rs b/grandine/src/main.rs index e12ae596..932fe7aa 100644 --- a/grandine/src/main.rs +++ b/grandine/src/main.rs @@ -27,7 +27,7 @@ use reqwest::{Client, ClientBuilder, Url}; use runtime::{MetricsConfig, StorageConfig}; use signer::{KeyOrigin, Signer}; use slasher::SlasherConfig; -use slashing_protection::SlashingProtector; +use slashing_protection::{interchange_format::InterchangeData, SlashingProtector}; use ssz::SszRead as _; use std_ext::ArcExt as _; use thiserror::Error; @@ -719,9 +719,30 @@ fn handle_command( ); } InterchangeCommand::Export { file_path } => { - slashing_protector + let interchange = slashing_protector .export_to_interchange_file(&file_path, genesis_validators_root)?; + if interchange.is_empty() { + warn!( + "no records were exported. \ + This may indicate an issue if active validators are present. \ + Please verify your configuration settings.", + ); + } else { + for data in interchange.data { + let InterchangeData { + pubkey, + signed_attestations, + signed_blocks, + } = data; + + info!( + "exported {} records for {pubkey:?}", + signed_attestations.len() + signed_blocks.len(), + ); + } + } + info!("interchange file exported to {file_path:?}"); } } diff --git a/slashing_protection/src/interchange_format.rs b/slashing_protection/src/interchange_format.rs index e71fa30e..0c36659b 100644 --- a/slashing_protection/src/interchange_format.rs +++ b/slashing_protection/src/interchange_format.rs @@ -55,6 +55,13 @@ pub struct InterchangeAttestation { pub signing_root: Option, } +impl InterchangeData { + #[must_use] + pub fn is_empty(&self) -> bool { + self.signed_attestations.is_empty() && self.signed_blocks.is_empty() + } +} + impl InterchangeFormat { #[must_use] pub const fn new(genesis_validators_root: H256, data: Vec) -> Self { @@ -94,6 +101,11 @@ impl InterchangeFormat { Ok(()) } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.data.iter().all(InterchangeData::is_empty) + } } #[derive(Debug, Error)] @@ -262,4 +274,50 @@ mod tests { Ok(()) } + + #[test] + fn interchange_format_emptyness_test() { + let mut interchange = InterchangeFormat { + metadata: InterchangeMeta { + interchange_format_version: 5, + genesis_validators_root: hex!( + "04700007fabc8282644aed6d1c7c9e21d38a03a0c4ba193f3afe428824b3a673" + ) + .into(), + }, + data: vec![], + }; + + assert!(interchange.is_empty()); + + let empty_interchange_data = InterchangeData { + pubkey: hex!("b845089a1457f811bfc000588fbb4e713669be8ce060ea6be3c6ece09afc3794106c91ca73acda5e5457122d58723bec").into(), + signed_blocks: vec![], + signed_attestations: vec![], + }; + + assert!(empty_interchange_data.is_empty()); + + interchange.data.push(empty_interchange_data); + + // interchange with empty interchange data should also be considered empty + assert!(interchange.is_empty()); + + let interchange_data = InterchangeData { + pubkey: hex!("b845089a1457f811bfc000588fbb4e713669be8ce060ea6be3c6ece09afc3794106c91ca73acda5e5457122d58723bed").into(), + signed_blocks: vec![ + InterchangeBlock { + slot: 81952, + signing_root: Some(hex!("4ff6f743a43f3b4f95350831aeaf0a122a1a392922c45d804280284a69eb850b").into()), + }, + ], + signed_attestations: vec![], + }; + + assert!(!interchange_data.is_empty()); + + interchange.data.push(interchange_data); + + assert!(!interchange.is_empty()); + } } diff --git a/slashing_protection/src/lib.rs b/slashing_protection/src/lib.rs index ed791e63..f33330e6 100644 --- a/slashing_protection/src/lib.rs +++ b/slashing_protection/src/lib.rs @@ -304,7 +304,7 @@ impl SlashingProtector { &mut self, interchange_file_path: impl AsRef, genesis_validators_root: H256, - ) -> Result<()> { + ) -> Result { let interchange = self.build_interchange_data(genesis_validators_root)?; let interchange_file_path = interchange_file_path.as_ref(); @@ -316,7 +316,7 @@ impl SlashingProtector { info!("Interchange file saved"); - Ok(()) + Ok(interchange) } pub fn build_interchange_data( From c416d654847c75d8cf55c24ef50248d636098d43 Mon Sep 17 00:00:00 2001 From: Tumas Date: Thu, 28 Nov 2024 12:33:23 +0200 Subject: [PATCH 7/7] Start with a lowercase letter in slashing_protection log messages --- slashing_protection/src/lib.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/slashing_protection/src/lib.rs b/slashing_protection/src/lib.rs index f33330e6..2e3cf8f8 100644 --- a/slashing_protection/src/lib.rs +++ b/slashing_protection/src/lib.rs @@ -245,7 +245,7 @@ impl SlashingProtector { let result = Self::find_or_store_validator(&transaction, pubkey); if let Ok(validator_id) = result { - debug!("Successfully imported validator (pubkey: {pubkey:?})"); + debug!("successfully imported validator (pubkey: {pubkey:?})"); report.validators.succeeded.push(interchange_record.pubkey); @@ -309,12 +309,12 @@ impl SlashingProtector { let interchange_file_path = interchange_file_path.as_ref(); - info!("Saving validator information to interchange file: {interchange_file_path:?}"); + info!("saving validator information to interchange file: {interchange_file_path:?}"); let file = File::create(interchange_file_path)?; serde_json::to_writer(file, &interchange)?; - info!("Interchange file saved"); + info!("interchange file saved"); Ok(interchange) } @@ -413,7 +413,7 @@ impl SlashingProtector { return Ok(validator_id); } - debug!("Saving validator information to slashing protection db (pubkey: {pubkey:?})",); + debug!("saving validator information to slashing protection db (pubkey: {pubkey:?})"); transaction.execute( "INSERT INTO validators (pubkey) VALUES (?1)", @@ -625,7 +625,7 @@ impl SlashingProtector { Ok(outcome) => match outcome { SlashingValidationOutcome::Accept => Some(attestation), SlashingValidationOutcome::Ignore => { - warn!("slashing protector ignored duplicate attestation: {attestation:?}",); + warn!("slashing protector ignored duplicate attestation: {attestation:?}"); None } @@ -746,10 +746,10 @@ impl SlashingProtector { None => self.store_current_epoch(current_epoch)?, } - info!("Pruning slashing protection db, current epoch: {current_epoch}"); + info!("pruning slashing protection db, current epoch: {current_epoch}"); let Some(prune_up_to_epoch) = current_epoch.checked_sub(self.history_limit) else { - debug!("Skipping slashing protection db pruning for epoch: {current_epoch}"); + debug!("skipping slashing protection db pruning for epoch: {current_epoch}"); return Ok(()); }; @@ -762,8 +762,8 @@ impl SlashingProtector { }; match run() { - Ok(()) => info!("Slashing protection db pruning completed for epoch: {current_epoch}"), - Err(error) => warn!("Error occurred while pruning slashing protection db: {error:?}"), + Ok(()) => info!("slashing protection db pruning completed for epoch: {current_epoch}"), + Err(error) => warn!("error occurred while pruning slashing protection db: {error:?}"), } Ok(()) @@ -912,7 +912,7 @@ fn remove_fork_version_from_validators_if_needed( return Ok(()); } - info!("Migrating the slashing protection database. Please wait…"); + info!("migrating the slashing protection database. Please wait…"); let interchange = slashing_protector.build_interchange_data(genesis_validators_root)?; @@ -922,14 +922,14 @@ fn remove_fork_version_from_validators_if_needed( )); info!( - "Saving validator information to interchange file as a backup: {}", + "saving validator information to interchange file as a backup: {}", interchange_file_path.display(), ); let file = File::create(interchange_file_path)?; serde_json::to_writer(file, &interchange)?; - info!("Interchange file saved"); + info!("interchange file saved"); fs_err::remove_file(validator_directory.as_ref().join(DB_PATH))?;