Skip to content

Commit

Permalink
Merge branch 'develop' into pectra-devnet4
Browse files Browse the repository at this point in the history
  • Loading branch information
povi committed Nov 28, 2024
2 parents 201219d + c416d65 commit 9af1b11
Show file tree
Hide file tree
Showing 29 changed files with 274 additions and 62 deletions.
64 changes: 47 additions & 17 deletions block_producer/src/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ pub type ExecutionPayloadHeaderJoinHandle<P> = JoinHandle<Result<Option<SignedBu
pub type LocalExecutionPayloadJoinHandle<P> =
JoinHandle<Option<WithBlobsAndMev<ExecutionPayload<P>, P>>>;

#[derive(Default)]
pub struct Options {
pub fake_execution_payloads: bool,
}

pub struct BlockProducer<P: Preset, W: Wait> {
producer_context: Arc<ProducerContext<P, W>>,
}
Expand All @@ -118,7 +123,12 @@ impl<P: Preset, W: Wait> BlockProducer<P, W> {
bls_to_execution_change_pool: Arc<BlsToExecutionChangePool>,
sync_committee_agg_pool: Arc<SyncCommitteeAggPool<P, W>>,
metrics: Option<Arc<Metrics>>,
options: Option<Options>,
) -> Self {
let Options {
fake_execution_payloads,
} = options.unwrap_or_default();

let producer_context = Arc::new(ProducerContext {
chain_config: controller.chain_config().clone_arc(),
proposer_configs,
Expand All @@ -137,6 +147,7 @@ impl<P: Preset, W: Wait> BlockProducer<P, W> {
payload_cache: Mutex::new(SizedCache::with_size(PAYLOAD_CACHE_SIZE)),
payload_id_cache: Mutex::new(SizedCache::with_size(PAYLOAD_ID_CACHE_SIZE)),
metrics,
fake_execution_payloads,
});

Self { producer_context }
Expand Down Expand Up @@ -574,6 +585,7 @@ struct ProducerContext<P: Preset, W: Wait> {
payload_cache: Mutex<SizedCache<H256, WithBlobsAndMev<ExecutionPayload<P>, P>>>,
payload_id_cache: Mutex<SizedCache<(H256, Slot), PayloadId>>,
metrics: Option<Arc<Metrics>>,
fake_execution_payloads: bool,
}

#[derive(Clone, Copy, Default)]
Expand Down Expand Up @@ -1019,29 +1031,24 @@ impl<P: Preset, W: Wait> BlockBuildContext<P, W> {
};

let WithBlobsAndMev {
value: mut execution_payload,
value: execution_payload,
commitments,
proofs,
blobs,
mev,
execution_requests,
} = with_blobs_and_mev.unwrap_or_else(|| WithBlobsAndMev::with_default(None));

let slot = self.beacon_state.slot();
} = match with_blobs_and_mev {
Some(payload_with_mev) => payload_with_mev,
None => {
if self.beacon_state.post_capella().is_some()
|| post_merge_state(&self.beacon_state).is_some()
{
return Err(AnyhowError::msg("no execution payload to include in block"));
}

// Starting with Capella, all blocks must be post-Merge.
// Construct a superficially valid execution payload for snapshot testing.
// It will almost always be invalid in a real network, but so would a default payload.
// Construct the payload with a fictitious `ExecutionBlockHash` derived from the slot.
// Computing the real `ExecutionBlockHash` would make maintaining tests much harder.
if self.beacon_state.phase() >= Phase::Capella && execution_payload.is_none() {
execution_payload = Some(factory::execution_payload(
&self.producer_context.chain_config,
&self.beacon_state,
slot,
ExecutionBlockHash::from_low_u64_be(slot),
)?);
}
WithBlobsAndMev::with_default(None)
}
};

let without_state_root_with_payload = block_without_state_root
.with_execution_payload(execution_payload)?
Expand Down Expand Up @@ -1703,6 +1710,29 @@ impl<P: Preset, W: Wait> BlockBuildContext<P, W> {
async fn local_execution_payload_option(
&self,
) -> Option<WithBlobsAndMev<ExecutionPayload<P>, P>> {
if self.producer_context.fake_execution_payloads {
let slot = self.beacon_state.slot();

// Starting with Capella, all blocks must be post-Merge.
// Construct a superficially valid execution payload for snapshot testing.
// It will almost always be invalid in a real network, but so would a default payload.
// Construct the payload with a fictitious `ExecutionBlockHash` derived from the slot.
// Computing the real `ExecutionBlockHash` would make maintaining tests much harder.
if self.beacon_state.phase() >= Phase::Capella {
let execution_payload = factory::execution_payload(
&self.producer_context.chain_config,
&self.beacon_state,
slot,
ExecutionBlockHash::from_low_u64_be(slot),
);

match execution_payload {
Ok(payload) => return Some(WithBlobsAndMev::with_default(payload)),
Err(error) => panic!("failed to produce fake payload: {error:?}"),
};
}
}

let _timer = self
.producer_context
.metrics
Expand Down
2 changes: 1 addition & 1 deletion block_producer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub use block_producer::{BlockBuildOptions, BlockProducer};
pub use block_producer::{BlockBuildOptions, BlockProducer, Options};
pub use misc::{ProposerData, ValidatorBlindedBlock};

mod block_producer;
Expand Down
25 changes: 23 additions & 2 deletions grandine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use reqwest::{Client, ClientBuilder, Url};
use runtime::{MetricsConfig, StorageConfig};
use signer::{KeyOrigin, Signer};
use slasher::SlasherConfig;
use slashing_protection::SlashingProtector;
use slashing_protection::{interchange_format::InterchangeData, SlashingProtector};
use ssz::SszRead as _;
use std_ext::ArcExt as _;
use thiserror::Error;
Expand Down Expand Up @@ -722,9 +722,30 @@ fn handle_command<P: Preset>(
);
}
InterchangeCommand::Export { file_path } => {
slashing_protector
let interchange = slashing_protector
.export_to_interchange_file(&file_path, genesis_validators_root)?;

if interchange.is_empty() {
warn!(
"no records were exported. \
This may indicate an issue if active validators are present. \
Please verify your configuration settings.",
);
} else {
for data in interchange.data {
let InterchangeData {
pubkey,
signed_attestations,
signed_blocks,
} = data;

info!(
"exported {} records for {pubkey:?}",
signed_attestations.len() + signed_blocks.len(),
);
}
}

info!("interchange file exported to {file_path:?}");
}
}
Expand Down
5 changes: 4 additions & 1 deletion helper_functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ im = { workspace = true }
itertools = { workspace = true }
num-integer = { workspace = true }
parse-display = { workspace = true }
prometheus_metrics = { workspace = true }
prometheus_metrics = { workspace = true, optional = true }
rayon = { workspace = true }
rc-box = { workspace = true }
serde = { workspace = true }
Expand All @@ -41,3 +41,6 @@ nonzero_ext = { workspace = true }
spec_test_utils = { workspace = true }
test-case = { workspace = true }
test-generator = { workspace = true }

[features]
metrics = ['prometheus_metrics']
9 changes: 8 additions & 1 deletion helper_functions/src/accessors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use bls::{AggregatePublicKey, CachedPublicKey, PublicKeyBytes};
use im::HashMap;
use itertools::{EitherOrBoth, Itertools as _};
use num_integer::Roots as _;
use prometheus_metrics::METRICS;
use rc_box::ArcBox;
use ssz::{ContiguousVector, FitsInU64, Hc, SszHash as _};
use std_ext::CopyExt as _;
Expand Down Expand Up @@ -47,6 +46,9 @@ use types::{

use crate::{error::Error, misc, predicates};

#[cfg(feature = "metrics")]
use prometheus_metrics::METRICS;

#[must_use]
pub fn get_previous_epoch<P: Preset>(state: &impl BeaconState<P>) -> Epoch {
get_current_epoch(state)
Expand Down Expand Up @@ -197,6 +199,7 @@ pub fn get_or_init_validator_indices<P: Preset>(
) -> &HashMap<PublicKeyBytes, ValidatorIndex> {
state.cache().validator_indices.get_or_init(|| {
if report_cache_miss {
#[cfg(feature = "metrics")]
if let Some(metrics) = METRICS.get() {
metrics.validator_indices_init_count.inc();
}
Expand Down Expand Up @@ -264,6 +267,7 @@ pub fn get_or_init_active_validator_indices_ordered<P: Preset>(

state.cache().active_validator_indices_ordered[relative_epoch].get_or_init(|| {
if report_cache_miss {
#[cfg(feature = "metrics")]
if let Some(metrics) = METRICS.get() {
metrics.active_validator_indices_ordered_init_count.inc();
}
Expand Down Expand Up @@ -316,6 +320,7 @@ where

state.cache().active_validator_indices_shuffled[relative_epoch].get_or_init(|| {
if report_cache_miss {
#[cfg(feature = "metrics")]
if let Some(metrics) = METRICS.get() {
metrics.active_validator_indices_shuffled_init_count.inc();
}
Expand Down Expand Up @@ -463,6 +468,7 @@ pub fn get_or_try_init_beacon_proposer_index<P: Preset>(
.proposer_index
.get_or_try_init(|| {
if report_cache_miss {
#[cfg(feature = "metrics")]
if let Some(metrics) = METRICS.get() {
metrics.beacon_proposer_index_init_count.inc();
}
Expand Down Expand Up @@ -527,6 +533,7 @@ pub fn get_or_init_total_active_balance<P: Preset>(
state.cache().total_active_balance[RelativeEpoch::Current]
.get_or_init(|| {
if report_cache_miss {
#[cfg(feature = "metrics")]
if let Some(metrics) = METRICS.get() {
metrics.total_active_balance_init_count.inc();
}
Expand Down
5 changes: 4 additions & 1 deletion http_api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use anyhow::Result;
use attestation_verifier::AttestationVerifier;
use block_producer::BlockProducer;
use block_producer::{BlockProducer, Options as BlockProducerOptions};
use bls::{PublicKeyBytes, SecretKey};
use clock::Tick;
use database::Database;
Expand Down Expand Up @@ -284,6 +284,9 @@ impl<P: Preset> Context<P> {
bls_to_execution_change_pool.clone_arc(),
sync_committee_agg_pool.clone_arc(),
None,
Some(BlockProducerOptions {
fake_execution_payloads: true,
}),
));

let validator_channels = ValidatorChannels {
Expand Down
4 changes: 2 additions & 2 deletions metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ eth2_libp2p = { workspace = true }
fork_choice_control = { workspace = true }
futures = { workspace = true }
grandine_version = { workspace = true }
helper_functions = { workspace = true }
helper_functions = { workspace = true, features = ['metrics'] }
http_api_utils = { workspace = true }
jemalloc-ctl = { workspace = true }
log = { workspace = true }
Expand All @@ -37,7 +37,7 @@ thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tower-http = { workspace = true }
transition_functions = { workspace = true }
transition_functions = { workspace = true, features = ['metrics'] }
types = { workspace = true }

[dev-dependencies]
Expand Down
3 changes: 2 additions & 1 deletion p2p/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ pub enum ServiceInboundMessage<P: Preset> {
Unsubscribe(GossipTopic),
UnsubscribeFromForkTopicsExcept(ForkDigest),
UpdateEnrSubnet(Subnet, bool),
UpdateForkVersion(EnrForkId),
UpdateFork(EnrForkId),
UpdateGossipsubParameters(u64, Slot),
}

impl<P: Preset> ServiceInboundMessage<P> {
Expand Down
36 changes: 31 additions & 5 deletions p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use futures::{
select,
stream::StreamExt as _,
};
use helper_functions::misc;
use helper_functions::{accessors, misc};
use log::{debug, error, info, trace, warn};
use logging::PEER_LOG_METRICS;
use operation_pools::{BlsToExecutionChangePool, Origin, PoolToP2pMessage, SyncCommitteeAggPool};
Expand All @@ -39,12 +39,13 @@ use slog::{o, Drain as _, Logger};
use slog_stdlog::StdLog;
use std_ext::ArcExt as _;
use thiserror::Error;
use tokio_stream::wrappers::IntervalStream;
use types::{
altair::containers::{SignedContributionAndProof, SyncCommitteeMessage},
capella::containers::SignedBlsToExecutionChange,
combined::{Attestation, AttesterSlashing, SignedAggregateAndProof, SignedBeaconBlock},
deneb::containers::{BlobIdentifier, BlobSidecar},
nonstandard::{Phase, WithStatus},
nonstandard::{Phase, RelativeEpoch, WithStatus},
phase0::{
consts::{FAR_FUTURE_EPOCH, GENESIS_EPOCH},
containers::{ProposerSlashing, SignedVoluntaryExit},
Expand All @@ -63,6 +64,7 @@ use crate::{
upnp::PortMappings,
};

const GOSSIPSUB_PARAMETER_UPDATE_INTERVAL: Duration = Duration::from_secs(60);
const MAX_FOR_DOS_PREVENTION: u64 = 64;

/// Number of slots before a new phase to subscribe to its topics.
Expand Down Expand Up @@ -202,8 +204,15 @@ impl<P: Preset> Network<P> {

#[allow(clippy::too_many_lines)]
pub async fn run(mut self) -> Result<Never> {
let mut gossipsub_parameter_update_interval =
IntervalStream::new(tokio::time::interval(GOSSIPSUB_PARAMETER_UPDATE_INTERVAL)).fuse();

loop {
select! {
_ = gossipsub_parameter_update_interval.select_next_some() => {
self.update_gossipsub_parameters();
},

message = self.service_to_network_rx.select_next_some() => {
match message {
ServiceOutboundMessage::NetworkEvent(network_event) => {
Expand Down Expand Up @@ -449,8 +458,7 @@ impl<P: Preset> Network<P> {

let new_enr_fork_id = Self::enr_fork_id(&self.controller, &self.fork_context, slot);

ServiceInboundMessage::UpdateForkVersion(new_enr_fork_id)
.send(&self.network_to_service_tx);
ServiceInboundMessage::UpdateFork(new_enr_fork_id).send(&self.network_to_service_tx);
}

// Subscribe to the topics of the next phase.
Expand Down Expand Up @@ -723,6 +731,15 @@ impl<P: Preset> Network<P> {
}
}

fn update_gossipsub_parameters(&self) {
let head_state = self.controller.head_state().value();
let active_validator_count =
accessors::active_validator_count_u64(&head_state, RelativeEpoch::Current);

ServiceInboundMessage::UpdateGossipsubParameters(active_validator_count, head_state.slot())
.send(&self.network_to_service_tx);
}

fn update_sync_committee_subnets(
&self,
actions: BTreeMap<SubnetId, SyncCommitteeSubnetAction>,
Expand Down Expand Up @@ -1938,8 +1955,17 @@ fn run_network_service<P: Preset>(
ServiceInboundMessage::UpdateEnrSubnet(subnet, advertise) => {
service.update_enr_subnet(subnet, advertise);
}
ServiceInboundMessage::UpdateForkVersion(enr_fork_id) => {
ServiceInboundMessage::UpdateFork(enr_fork_id) => {
service.update_fork_version(enr_fork_id);
service.remove_topic_weight_except(enr_fork_id.fork_digest);
}
ServiceInboundMessage::UpdateGossipsubParameters(active_validator_count, slot) => {
if let Err(error) = service.update_gossipsub_parameters(
active_validator_count,
slot
) {
warn!("unable to update gossipsub scoring parameters: {error:?}");
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ pub async fn run_after_genesis<P: Preset>(
bls_to_execution_change_pool.clone_arc(),
sync_committee_agg_pool.clone_arc(),
metrics.clone(),
None,
));

let validator_channels = ValidatorChannels {
Expand Down
Loading

0 comments on commit 9af1b11

Please sign in to comment.