Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: preload cachedreads with tip state #5804

Merged
merged 23 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8513037
feat: explore payload builder, prepare boilerplate code for the next …
allnil Dec 16, 2023
caf046c
feat: pass canon state notification stream to payload loader, add new…
allnil Dec 17, 2023
558313a
save progress
allnil Dec 18, 2023
36befd2
remove generic stream type for chain events
allnil Dec 20, 2023
f260505
Merge branch 'main' of github.com:allnil/reth into feat/preload-cache…
allnil Dec 20, 2023
59fbc48
pass reth components events stream in payload builder service, add bo…
allnil Dec 20, 2023
6b5bfd2
refactor traits, add task to grab events and new tip
allnil Dec 20, 2023
f8705c9
Merge branch 'main' of github.com:allnil/reth into feat/preload-cache…
allnil Dec 21, 2023
1b8f281
Merge branch 'main' of github.com:allnil/reth into feat/preload-cache…
allnil Dec 22, 2023
5ea77b3
chore: refactor code, get canonical_chain_stream, remove generics, tr…
allnil Dec 22, 2023
264f825
chore: get back unpin from canon state notifications in components
allnil Dec 22, 2023
706e992
chore: remove clone trait from payload generator in future
allnil Dec 22, 2023
e1a31f2
chore: successfully pass stream to the payload service, clean rubbish
allnil Dec 22, 2023
42c7059
chore: experiment with cache_reads preservation
allnil Dec 22, 2023
eb6a55e
merge
allnil Dec 30, 2023
86ad016
merge
allnil Dec 30, 2023
65bebed
changed on_new_state signature to &mut self
allnil Dec 30, 2023
f5d1ceb
Merge branch 'main' into feat/preload-cached-reads
mattsse Jan 9, 2024
2adb0c8
feat: preload cache based on committed state
mattsse Jan 9, 2024
0f3d4ae
fix: make tests compile
mattsse Jan 9, 2024
c42025b
move to helper fn
mattsse Jan 9, 2024
e86b1c3
Merge branch 'main' into feat/preload-cached-reads
mattsse Jan 9, 2024
7d6eeea
update interface
mattsse Jan 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/reth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ const-str = "0.5.6"
boyer-moore-magiclen = "0.2.16"
itertools.workspace = true
rayon.workspace = true
futures-util.workspace = true

[target.'cfg(not(windows))'.dependencies]
jemallocator = { version = "0.5.0", optional = true }
Expand Down
6 changes: 5 additions & 1 deletion bin/reth/src/cli/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::cli::{
use clap::Args;
use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
use reth_provider::CanonStateSubscriptions;
use reth_tasks::TaskSpawner;
use std::{fmt, marker::PhantomData};

Expand Down Expand Up @@ -161,7 +162,10 @@ pub trait RethNodeCommandConfig: fmt::Debug {
components.chain_spec(),
payload_builder,
);
let (payload_service, payload_builder) = PayloadBuilderService::new(payload_generator);
let (payload_service, payload_builder) = PayloadBuilderService::new(
payload_generator,
components.events().canonical_state_stream(),
);

components
.task_executor()
Expand Down
5 changes: 3 additions & 2 deletions bin/reth/src/commands/debug_cmd/replay_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use reth_primitives::{
fs::{self},
ChainSpec,
};
use reth_provider::{providers::BlockchainProvider, ProviderFactory};
use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions, ProviderFactory};
use reth_revm::EvmProcessorFactory;
use reth_rpc_types::{
engine::{CancunPayloadFields, ForkchoiceState, PayloadAttributes},
Expand Down Expand Up @@ -175,7 +175,8 @@ impl Command {
self.chain.clone(),
payload_builder,
);
let (payload_service, payload_builder) = PayloadBuilderService::new(payload_generator);
let (payload_service, payload_builder) =
PayloadBuilderService::new(payload_generator, blockchain_db.canonical_state_stream());
ctx.task_executor.spawn_critical("payload builder service", Box::pin(payload_service));

// Configure the consensus engine
Expand Down
92 changes: 72 additions & 20 deletions crates/payload/basic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,6 @@
use alloy_rlp::Encodable;
use futures_core::ready;
use futures_util::FutureExt;
use revm::{
db::states::bundle_state::BundleRetention,
primitives::{BlockEnv, CfgEnv, Env},
Database, DatabaseCommit, State,
};
use std::{
future::Future,
pin::Pin,
sync::{atomic::AtomicBool, Arc},
task::{Context, Poll},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::{
sync::{oneshot, Semaphore},
time::{Interval, Sleep},
};
use tracing::{debug, trace, warn};

use reth_interfaces::RethResult;
use reth_payload_builder::{
database::CachedReads, error::PayloadBuilderError, BuiltPayload, KeepPayloadJobAlive,
Expand All @@ -43,14 +25,32 @@ use reth_primitives::{
B256, EMPTY_OMMER_ROOT_HASH, U256,
};
use reth_provider::{
BlockReaderIdExt, BlockSource, BundleStateWithReceipts, ProviderError, StateProviderFactory,
BlockReaderIdExt, BlockSource, BundleStateWithReceipts, CanonStateNotification, ProviderError,
StateProviderFactory,
};
use reth_revm::{
database::StateProviderDatabase,
state_change::{apply_beacon_root_contract_call, post_block_withdrawals_balance_increments},
};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use revm::{
db::states::bundle_state::BundleRetention,
primitives::{BlockEnv, CfgEnv, Env},
Database, DatabaseCommit, State,
};
use std::{
future::Future,
pin::Pin,
sync::{atomic::AtomicBool, Arc},
task::{Context, Poll},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::{
sync::{oneshot, Semaphore},
time::{Interval, Sleep},
};
use tracing::{debug, trace, warn};

use crate::metrics::PayloadBuilderMetrics;

Expand All @@ -75,6 +75,8 @@ pub struct BasicPayloadJobGenerator<Client, Pool, Tasks, Builder> {
///
/// See [PayloadBuilder]
builder: Builder,
/// Stored cached_reads for new payload jobs.
pre_cached: Option<PrecachedState>,
}

// === impl BasicPayloadJobGenerator ===
Expand All @@ -97,6 +99,7 @@ impl<Client, Pool, Tasks, Builder> BasicPayloadJobGenerator<Client, Pool, Tasks,
config,
chain_spec,
builder,
pre_cached: None,
}
}

Expand All @@ -123,6 +126,22 @@ impl<Client, Pool, Tasks, Builder> BasicPayloadJobGenerator<Client, Pool, Tasks,
fn job_deadline(&self, unix_timestamp: u64) -> tokio::time::Instant {
tokio::time::Instant::now() + self.max_job_duration(unix_timestamp)
}

/// Returns a reference to the tasks type
pub fn tasks(&self) -> &Tasks {
&self.executor
}

/// Returns the pre-cached reads for the given parent block if it matches the cached state's
/// block.
fn maybe_pre_cached(&self, parent: B256) -> Option<CachedReads> {
let pre_cached = self.pre_cached.as_ref()?;
if pre_cached.block == parent {
Some(pre_cached.cached.clone())
} else {
None
}
}
}

// === impl BasicPayloadJobGenerator ===
Expand Down Expand Up @@ -167,6 +186,8 @@ where
let until = self.job_deadline(config.attributes.timestamp);
let deadline = Box::pin(tokio::time::sleep_until(until));

let cached_reads = self.maybe_pre_cached(config.parent_block.hash());

Ok(BasicPayloadJob {
config,
client: self.client.clone(),
Expand All @@ -176,12 +197,43 @@ where
interval: tokio::time::interval(self.config.interval),
best_payload: None,
pending_block: None,
cached_reads: None,
cached_reads,
payload_task_guard: self.payload_task_guard.clone(),
metrics: Default::default(),
builder: self.builder.clone(),
})
}

fn on_new_state(&mut self, new_state: CanonStateNotification) {
if let Some(committed) = new_state.committed() {
let mut cached = CachedReads::default();

// extract the state from the notification and put it into the cache
let new_state = committed.state();
for (addr, acc) in new_state.bundle_accounts_iter() {
if let Some(info) = acc.info.clone() {
// we want pre cache existing accounts and their storage
// this only includes changed accounts and storage but is better than nothing
let storage =
acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect();
cached.insert_account(addr, info, storage);
}
}

self.pre_cached = Some(PrecachedState { block: committed.tip().hash, cached });
}
}
}

/// Pre-filled [CachedReads] for a specific block.
///
/// This is extracted from the [CanonStateNotification] for the tip block.
#[derive(Debug, Clone)]
pub struct PrecachedState {
/// The block for which the state is pre-cached.
pub block: B256,
/// Cached state for the block.
pub cached: CachedReads,
}

/// Restricts how many generator tasks can be executed at once.
Expand Down
2 changes: 2 additions & 0 deletions crates/payload/builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ reth-rpc-types.workspace = true
reth-transaction-pool.workspace = true
reth-interfaces.workspace = true
reth-rpc-types-compat.workspace = true
reth-provider.workspace = true
reth-tasks.workspace = true

# ethereum
alloy-rlp.workspace = true
Expand Down
10 changes: 10 additions & 0 deletions crates/payload/builder/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ impl CachedReads {
fn as_db_mut<DB>(&mut self, db: DB) -> CachedReadsDbMut<'_, DB> {
CachedReadsDbMut { cached: self, db }
}

/// Inserts an account info into the cache.
pub fn insert_account(
&mut self,
address: Address,
info: AccountInfo,
storage: HashMap<U256, U256>,
) {
self.accounts.insert(address, CachedAccount { info: Some(info), storage });
}
}

#[derive(Debug)]
Expand Down
25 changes: 19 additions & 6 deletions crates/payload/builder/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crate::{
error::PayloadBuilderError, metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator,
BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes, PayloadJob,
};
use futures_util::{future::FutureExt, StreamExt};
use futures_util::{future::FutureExt, Stream, StreamExt};
use reth_provider::CanonStateNotification;
use reth_rpc_types::engine::PayloadId;
use std::{
fmt,
Expand Down Expand Up @@ -160,7 +161,7 @@ impl PayloadBuilderHandle {
/// does know nothing about how to build them, it just drives their jobs to completion.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PayloadBuilderService<Gen>
pub struct PayloadBuilderService<Gen, St>
where
Gen: PayloadJobGenerator,
{
Expand All @@ -174,25 +175,32 @@ where
command_rx: UnboundedReceiverStream<PayloadServiceCommand>,
/// Metrics for the payload builder service
metrics: PayloadBuilderServiceMetrics,
/// Chain events notification stream
chain_events: St,
}

// === impl PayloadBuilderService ===

impl<Gen> PayloadBuilderService<Gen>
impl<Gen, St> PayloadBuilderService<Gen, St>
where
Gen: PayloadJobGenerator,
{
/// Creates a new payload builder service and returns the [PayloadBuilderHandle] to interact
/// with it.
pub fn new(generator: Gen) -> (Self, PayloadBuilderHandle) {
///
/// This also takes a stream of chain events that will be forwarded to the generator to apply
/// additional logic when new state is committed. See also [PayloadJobGenerator::on_new_state].
pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle) {
let (service_tx, command_rx) = mpsc::unbounded_channel();
let service = Self {
generator,
payload_jobs: Vec::new(),
service_tx,
command_rx: UnboundedReceiverStream::new(command_rx),
metrics: Default::default(),
chain_events,
};

let handle = service.handle();
(service, handle)
}
Expand Down Expand Up @@ -271,17 +279,22 @@ where
}
}

impl<Gen> Future for PayloadBuilderService<Gen>
impl<Gen, St> Future for PayloadBuilderService<Gen, St>
where
Gen: PayloadJobGenerator + Unpin + 'static,
<Gen as PayloadJobGenerator>::Job: Unpin + 'static,
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
{
allnil marked this conversation as resolved.
Show resolved Hide resolved
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

loop {
// notify the generator of new chain events
while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
this.generator.on_new_state(new_head);
}

// we poll all jobs first, so we always have the latest payload that we can report if
// requests
// we don't care about the order of the jobs, so we can just swap_remove them
Expand Down
12 changes: 9 additions & 3 deletions crates/payload/builder/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
PayloadJobGenerator,
};
use reth_primitives::{Block, U256};
use reth_provider::CanonStateNotification;
use std::{
future::Future,
pin::Pin,
Expand All @@ -14,9 +15,14 @@ use std::{
};

/// Creates a new [PayloadBuilderService] for testing purposes.
pub fn test_payload_service(
) -> (PayloadBuilderService<TestPayloadJobGenerator>, PayloadBuilderHandle) {
PayloadBuilderService::new(Default::default())
pub fn test_payload_service() -> (
PayloadBuilderService<
TestPayloadJobGenerator,
futures_util::stream::Empty<CanonStateNotification>,
>,
PayloadBuilderHandle,
) {
PayloadBuilderService::new(Default::default(), futures_util::stream::empty())
}

/// Creates a new [PayloadBuilderService] for testing purposes and spawns it in the background.
Expand Down
10 changes: 10 additions & 0 deletions crates/payload/builder/src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Trait abstractions used by the payload crate.

use reth_provider::CanonStateNotification;

use crate::{error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes};
use std::{future::Future, sync::Arc};

Expand Down Expand Up @@ -80,4 +82,12 @@ pub trait PayloadJobGenerator: Send + Sync {
&self,
attr: PayloadBuilderAttributes,
) -> Result<Self::Job, PayloadBuilderError>;

/// Handles new chain state events
///
/// This is intended for any logic that needs to be run when the chain state changes or used to
/// use the in memory state for the head block.
fn on_new_state(&mut self, new_state: CanonStateNotification) {
let _ = new_state;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use reth_trie::{
updates::TrieUpdates,
StateRoot, StateRootError,
};
use revm::{db::states::BundleState, primitives::AccountInfo};
use revm::{
db::{states::BundleState, BundleAccount},
primitives::AccountInfo,
};
use std::collections::HashMap;

pub use revm::db::states::OriginalValuesKnown;
Expand Down Expand Up @@ -110,6 +113,11 @@ impl BundleStateWithReceipts {
self.bundle.state().iter().map(|(a, acc)| (*a, acc.info.as_ref()))
}

/// Return iterator over all [BundleAccount]s in the bundle
pub fn bundle_accounts_iter(&self) -> impl Iterator<Item = (Address, &BundleAccount)> {
self.bundle.state().iter().map(|(a, acc)| (*a, acc))
}

/// Get account if account is known.
pub fn account(&self, address: &Address) -> Option<Option<Account>> {
self.bundle.account(address).map(|a| a.info.clone().map(into_reth_acc))
Expand Down
Loading