diff --git a/Cargo.lock b/Cargo.lock index 75812e226f1d..92d114d20152 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5661,6 +5661,7 @@ dependencies = [ "eyre", "fdlimit", "futures", + "futures-util", "human_bytes", "humantime", "hyper", @@ -6322,8 +6323,10 @@ dependencies = [ "reth-interfaces", "reth-metrics", "reth-primitives", + "reth-provider", "reth-rpc-types", "reth-rpc-types-compat", + "reth-tasks", "reth-transaction-pool", "revm", "revm-primitives", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 4ad25b1cd029..8452402cf6db 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -112,6 +112,7 @@ const-str = "0.5.6" boyer-moore-magiclen = "0.2.16" itertools.workspace = true rayon.workspace = true +futures-util.workspace = true [target.'cfg(not(windows))'.dependencies] jemallocator = { version = "0.5.0", optional = true } diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index de1d0170b2ee..266acaa5f19d 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -7,6 +7,7 @@ use crate::cli::{ use clap::Args; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; +use reth_provider::CanonStateSubscriptions; use reth_tasks::TaskSpawner; use std::{fmt, marker::PhantomData}; @@ -161,7 +162,10 @@ pub trait RethNodeCommandConfig: fmt::Debug { components.chain_spec(), payload_builder, ); - let (payload_service, payload_builder) = PayloadBuilderService::new(payload_generator); + let (payload_service, payload_builder) = PayloadBuilderService::new( + payload_generator, + components.events().canonical_state_stream(), + ); components .task_executor() diff --git a/bin/reth/src/commands/debug_cmd/replay_engine.rs b/bin/reth/src/commands/debug_cmd/replay_engine.rs index a1b37da99296..adb0f8c3a58c 100644 --- a/bin/reth/src/commands/debug_cmd/replay_engine.rs +++ b/bin/reth/src/commands/debug_cmd/replay_engine.rs @@ -26,7 +26,7 @@ use reth_primitives::{ fs::{self}, ChainSpec, }; -use reth_provider::{providers::BlockchainProvider, ProviderFactory}; +use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions, ProviderFactory}; use reth_revm::EvmProcessorFactory; use reth_rpc_types::{ engine::{CancunPayloadFields, ForkchoiceState, PayloadAttributes}, @@ -175,7 +175,8 @@ impl Command { self.chain.clone(), payload_builder, ); - let (payload_service, payload_builder) = PayloadBuilderService::new(payload_generator); + let (payload_service, payload_builder) = + PayloadBuilderService::new(payload_generator, blockchain_db.canonical_state_stream()); ctx.task_executor.spawn_critical("payload builder service", Box::pin(payload_service)); // Configure the consensus engine diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index 21fc1ab2c0a4..c540b1065a4f 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -10,24 +10,6 @@ use alloy_rlp::Encodable; use futures_core::ready; use futures_util::FutureExt; -use revm::{ - db::states::bundle_state::BundleRetention, - primitives::{BlockEnv, CfgEnv, Env}, - Database, DatabaseCommit, State, -}; -use std::{ - future::Future, - pin::Pin, - sync::{atomic::AtomicBool, Arc}, - task::{Context, Poll}, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; -use tokio::{ - sync::{oneshot, Semaphore}, - time::{Interval, Sleep}, -}; -use tracing::{debug, trace, warn}; - use reth_interfaces::RethResult; use reth_payload_builder::{ database::CachedReads, error::PayloadBuilderError, BuiltPayload, KeepPayloadJobAlive, @@ -43,7 +25,8 @@ use reth_primitives::{ B256, EMPTY_OMMER_ROOT_HASH, U256, }; use reth_provider::{ - BlockReaderIdExt, BlockSource, BundleStateWithReceipts, ProviderError, StateProviderFactory, + BlockReaderIdExt, BlockSource, BundleStateWithReceipts, CanonStateNotification, ProviderError, + StateProviderFactory, }; use reth_revm::{ database::StateProviderDatabase, @@ -51,6 +34,23 @@ use reth_revm::{ }; use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; +use revm::{ + db::states::bundle_state::BundleRetention, + primitives::{BlockEnv, CfgEnv, Env}, + Database, DatabaseCommit, State, +}; +use std::{ + future::Future, + pin::Pin, + sync::{atomic::AtomicBool, Arc}, + task::{Context, Poll}, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use tokio::{ + sync::{oneshot, Semaphore}, + time::{Interval, Sleep}, +}; +use tracing::{debug, trace, warn}; use crate::metrics::PayloadBuilderMetrics; @@ -75,6 +75,8 @@ pub struct BasicPayloadJobGenerator { /// /// See [PayloadBuilder] builder: Builder, + /// Stored cached_reads for new payload jobs. + pre_cached: Option, } // === impl BasicPayloadJobGenerator === @@ -97,6 +99,7 @@ impl BasicPayloadJobGenerator BasicPayloadJobGenerator tokio::time::Instant { tokio::time::Instant::now() + self.max_job_duration(unix_timestamp) } + + /// Returns a reference to the tasks type + pub fn tasks(&self) -> &Tasks { + &self.executor + } + + /// Returns the pre-cached reads for the given parent block if it matches the cached state's + /// block. + fn maybe_pre_cached(&self, parent: B256) -> Option { + let pre_cached = self.pre_cached.as_ref()?; + if pre_cached.block == parent { + Some(pre_cached.cached.clone()) + } else { + None + } + } } // === impl BasicPayloadJobGenerator === @@ -167,6 +186,8 @@ where let until = self.job_deadline(config.attributes.timestamp); let deadline = Box::pin(tokio::time::sleep_until(until)); + let cached_reads = self.maybe_pre_cached(config.parent_block.hash()); + Ok(BasicPayloadJob { config, client: self.client.clone(), @@ -176,12 +197,43 @@ where interval: tokio::time::interval(self.config.interval), best_payload: None, pending_block: None, - cached_reads: None, + cached_reads, payload_task_guard: self.payload_task_guard.clone(), metrics: Default::default(), builder: self.builder.clone(), }) } + + fn on_new_state(&mut self, new_state: CanonStateNotification) { + if let Some(committed) = new_state.committed() { + let mut cached = CachedReads::default(); + + // extract the state from the notification and put it into the cache + let new_state = committed.state(); + for (addr, acc) in new_state.bundle_accounts_iter() { + if let Some(info) = acc.info.clone() { + // we want pre cache existing accounts and their storage + // this only includes changed accounts and storage but is better than nothing + let storage = + acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect(); + cached.insert_account(addr, info, storage); + } + } + + self.pre_cached = Some(PrecachedState { block: committed.tip().hash, cached }); + } + } +} + +/// Pre-filled [CachedReads] for a specific block. +/// +/// This is extracted from the [CanonStateNotification] for the tip block. +#[derive(Debug, Clone)] +pub struct PrecachedState { + /// The block for which the state is pre-cached. + pub block: B256, + /// Cached state for the block. + pub cached: CachedReads, } /// Restricts how many generator tasks can be executed at once. diff --git a/crates/payload/builder/Cargo.toml b/crates/payload/builder/Cargo.toml index 854b182459ed..b780acc2d6db 100644 --- a/crates/payload/builder/Cargo.toml +++ b/crates/payload/builder/Cargo.toml @@ -18,6 +18,8 @@ reth-rpc-types.workspace = true reth-transaction-pool.workspace = true reth-interfaces.workspace = true reth-rpc-types-compat.workspace = true +reth-provider.workspace = true +reth-tasks.workspace = true # ethereum alloy-rlp.workspace = true diff --git a/crates/payload/builder/src/database.rs b/crates/payload/builder/src/database.rs index 04998c45b7e6..0205bad335d3 100644 --- a/crates/payload/builder/src/database.rs +++ b/crates/payload/builder/src/database.rs @@ -50,6 +50,16 @@ impl CachedReads { fn as_db_mut(&mut self, db: DB) -> CachedReadsDbMut<'_, DB> { CachedReadsDbMut { cached: self, db } } + + /// Inserts an account info into the cache. + pub fn insert_account( + &mut self, + address: Address, + info: AccountInfo, + storage: HashMap, + ) { + self.accounts.insert(address, CachedAccount { info: Some(info), storage }); + } } #[derive(Debug)] diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 300e514037fe..f760bce110d4 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -7,7 +7,8 @@ use crate::{ error::PayloadBuilderError, metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes, PayloadJob, }; -use futures_util::{future::FutureExt, StreamExt}; +use futures_util::{future::FutureExt, Stream, StreamExt}; +use reth_provider::CanonStateNotification; use reth_rpc_types::engine::PayloadId; use std::{ fmt, @@ -160,7 +161,7 @@ impl PayloadBuilderHandle { /// does know nothing about how to build them, it just drives their jobs to completion. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct PayloadBuilderService +pub struct PayloadBuilderService where Gen: PayloadJobGenerator, { @@ -174,17 +175,22 @@ where command_rx: UnboundedReceiverStream, /// Metrics for the payload builder service metrics: PayloadBuilderServiceMetrics, + /// Chain events notification stream + chain_events: St, } // === impl PayloadBuilderService === -impl PayloadBuilderService +impl PayloadBuilderService where Gen: PayloadJobGenerator, { /// Creates a new payload builder service and returns the [PayloadBuilderHandle] to interact /// with it. - pub fn new(generator: Gen) -> (Self, PayloadBuilderHandle) { + /// + /// This also takes a stream of chain events that will be forwarded to the generator to apply + /// additional logic when new state is committed. See also [PayloadJobGenerator::on_new_state]. + pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle) { let (service_tx, command_rx) = mpsc::unbounded_channel(); let service = Self { generator, @@ -192,7 +198,9 @@ where service_tx, command_rx: UnboundedReceiverStream::new(command_rx), metrics: Default::default(), + chain_events, }; + let handle = service.handle(); (service, handle) } @@ -271,17 +279,22 @@ where } } -impl Future for PayloadBuilderService +impl Future for PayloadBuilderService where Gen: PayloadJobGenerator + Unpin + 'static, ::Job: Unpin + 'static, + St: Stream + Send + Unpin + 'static, { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - loop { + // notify the generator of new chain events + while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) { + this.generator.on_new_state(new_head); + } + // we poll all jobs first, so we always have the latest payload that we can report if // requests // we don't care about the order of the jobs, so we can just swap_remove them diff --git a/crates/payload/builder/src/test_utils.rs b/crates/payload/builder/src/test_utils.rs index 0257c4c0bd01..871c4828b1d0 100644 --- a/crates/payload/builder/src/test_utils.rs +++ b/crates/payload/builder/src/test_utils.rs @@ -6,6 +6,7 @@ use crate::{ PayloadJobGenerator, }; use reth_primitives::{Block, U256}; +use reth_provider::CanonStateNotification; use std::{ future::Future, pin::Pin, @@ -14,9 +15,14 @@ use std::{ }; /// Creates a new [PayloadBuilderService] for testing purposes. -pub fn test_payload_service( -) -> (PayloadBuilderService, PayloadBuilderHandle) { - PayloadBuilderService::new(Default::default()) +pub fn test_payload_service() -> ( + PayloadBuilderService< + TestPayloadJobGenerator, + futures_util::stream::Empty, + >, + PayloadBuilderHandle, +) { + PayloadBuilderService::new(Default::default(), futures_util::stream::empty()) } /// Creates a new [PayloadBuilderService] for testing purposes and spawns it in the background. diff --git a/crates/payload/builder/src/traits.rs b/crates/payload/builder/src/traits.rs index ab118709fa41..60f6328176bd 100644 --- a/crates/payload/builder/src/traits.rs +++ b/crates/payload/builder/src/traits.rs @@ -1,5 +1,7 @@ //! Trait abstractions used by the payload crate. +use reth_provider::CanonStateNotification; + use crate::{error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes}; use std::{future::Future, sync::Arc}; @@ -80,4 +82,12 @@ pub trait PayloadJobGenerator: Send + Sync { &self, attr: PayloadBuilderAttributes, ) -> Result; + + /// Handles new chain state events + /// + /// This is intended for any logic that needs to be run when the chain state changes or used to + /// use the in memory state for the head block. + fn on_new_state(&mut self, new_state: CanonStateNotification) { + let _ = new_state; + } } diff --git a/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs b/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs index 958ed5cbd09b..196e8cd5cebe 100644 --- a/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs +++ b/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs @@ -16,7 +16,10 @@ use reth_trie::{ updates::TrieUpdates, StateRoot, StateRootError, }; -use revm::{db::states::BundleState, primitives::AccountInfo}; +use revm::{ + db::{states::BundleState, BundleAccount}, + primitives::AccountInfo, +}; use std::collections::HashMap; pub use revm::db::states::OriginalValuesKnown; @@ -110,6 +113,11 @@ impl BundleStateWithReceipts { self.bundle.state().iter().map(|(a, acc)| (*a, acc.info.as_ref())) } + /// Return iterator over all [BundleAccount]s in the bundle + pub fn bundle_accounts_iter(&self) -> impl Iterator { + self.bundle.state().iter().map(|(a, acc)| (*a, acc)) + } + /// Get account if account is known. pub fn account(&self, address: &Address) -> Option> { self.bundle.account(address).map(|a| a.info.clone().map(into_reth_acc))