From 23a1c7588d3a3c842f591e2edb67c46471c6357a Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Mon, 25 Nov 2024 12:47:26 +0100 Subject: [PATCH] refactor: remove incoming state updates thread --- crates/engine/tree/src/tree/root.rs | 292 ++++++++++++++-------------- 1 file changed, 151 insertions(+), 141 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 5f39304c92c4..cc8d6d70e910 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -27,7 +27,6 @@ use std::{ mpsc::{self, Receiver, RecvError, Sender}, Arc, }, - thread, time::{Duration, Instant}, }; use tracing::{debug, error, trace}; @@ -69,6 +68,7 @@ pub(crate) struct StateRootConfig { } /// Wrapper for std channel receiver to maintain compatibility with `UnboundedReceiverStream` +#[derive(Debug)] #[allow(dead_code)] pub(crate) struct StdReceiverStream { rx: Receiver, @@ -88,10 +88,6 @@ impl StdReceiverStream { /// Messages that can be received by the state root task #[derive(Debug)] pub(crate) enum StateRootMessage { - /// New state update from transaction execution - StateUpdate(EvmState), - /// Input state stream has closed - InputStreamClosed, /// Proof calculation completed for a specific state update ProofCalculated { /// The calculated proof @@ -158,6 +154,14 @@ impl ProofSequencer { } } +/// Statistics tracking. +#[derive(Debug, Default)] +struct TaskStats { + updates_received: u64, + proofs_processed: u64, + roots_calculated: u64, +} + /// Standalone task that receives a transaction state stream and updates relevant /// data structures to calculate state root. /// @@ -180,8 +184,8 @@ pub(crate) struct StateRootTask { proof_sequencer: ProofSequencer, /// Whether we're currently calculating a root calculating_root: bool, - /// Background thread handle for input state stream processing - _stream_handler: Option>, + /// Incoming state updates. + state_stream: StdReceiverStream, } #[allow(dead_code)] @@ -193,30 +197,6 @@ where pub(crate) fn new(config: StateRootConfig, state_stream: StdReceiverStream) -> Self { let (tx, rx) = mpsc::channel(); - // spawn a background thread to forward state updates from StdReceiverStream - // to our internal message channel - let stream_tx = tx.clone(); - let stream_handler = thread::Builder::new() - .name("State Stream Handler".to_string()) - .spawn(move || { - loop { - match state_stream.recv() { - Ok(state) => { - if stream_tx.send(StateRootMessage::StateUpdate(state)).is_err() { - break; - } - } - Err(_) => { - // stream closed normally - debug!(target: "engine::root", "State stream closed normally"); - let _ = stream_tx.send(StateRootMessage::InputStreamClosed); - break; - } - } - } - }) - .expect("failed to spawn stream handler thread"); - Self { config, rx, @@ -224,7 +204,7 @@ where state: Default::default(), proof_sequencer: ProofSequencer::new(), calculating_root: false, - _stream_handler: Some(stream_handler), + state_stream, } } @@ -367,128 +347,158 @@ where }); } + /// Handles internal messages (proofs and root calculations) and tracks state + fn handle_internal_message( + &mut self, + message: StateRootMessage, + stats: &mut TaskStats, + current_multiproof: &mut MultiProof, + trie_updates: &mut TrieUpdates, + current_root: &mut B256, + ) -> Option { + match message { + StateRootMessage::ProofCalculated { proof, sequence_number } => { + stats.proofs_processed += 1; + trace!( + target: "engine::root", + sequence = sequence_number, + total_proofs = stats.proofs_processed, + "Processing calculated proof" + ); + + if let Some(combined_proof) = self.on_proof(proof, sequence_number) { + if self.calculating_root { + current_multiproof.extend(combined_proof); + } else { + self.spawn_root_calculation(combined_proof); + } + } + None + } + StateRootMessage::RootCalculated { root, updates, elapsed } => { + stats.roots_calculated += 1; + trace!( + target: "engine::root", + %root, + ?elapsed, + roots_calculated = stats.roots_calculated, + proofs = stats.proofs_processed, + updates = stats.updates_received, + "Computed intermediate root" + ); + *current_root = root; + trie_updates.extend(updates); + self.calculating_root = false; + + let has_new_proofs = !current_multiproof.account_subtree.is_empty() || + !current_multiproof.storages.is_empty(); + let all_proofs_received = stats.proofs_processed >= stats.updates_received; + let no_pending = !self.proof_sequencer.has_pending(); + + if has_new_proofs { + trace!( + target: "engine::root", + account_proofs = current_multiproof.account_subtree.len(), + storage_proofs = current_multiproof.storages.len(), + "Spawning subsequent root calculation" + ); + self.spawn_root_calculation(std::mem::take(current_multiproof)); + None + } else if all_proofs_received && no_pending { + debug!( + target: "engine::root", + total_updates = stats.updates_received, + total_proofs = stats.proofs_processed, + roots_calculated = stats.roots_calculated, + "All proofs processed, ending calculation" + ); + Some(Ok((*current_root, trie_updates.clone()))) + } else { + None + } + } + } + } + + /// Handle internal message channel errors + fn handle_internal_error() -> StateRootResult { + error!(target: "engine::root", "Internal message channel closed unexpectedly"); + Err(ParallelStateRootError::Other("Internal message channel closed unexpectedly".into())) + } + fn run(mut self) -> StateRootResult { + let mut stats = TaskStats::default(); let mut current_multiproof = MultiProof::default(); let mut trie_updates = TrieUpdates::default(); let mut current_root = B256::default(); - let mut updates_received = 0; - let mut proofs_processed = 0; - let mut roots_calculated = 0; - let mut input_stream_closed = false; loop { - match self.rx.recv() { - Ok(message) => match message { - StateRootMessage::StateUpdate(update) => { - updates_received += 1; - trace!( - target: "engine::root", - len = update.len(), - total_updates = updates_received, - "Received new state update" - ); - Self::on_state_update( - self.config.consistent_view.clone(), - self.config.input.clone(), - update, - &mut self.state, - self.proof_sequencer.next_sequence(), - self.tx.clone(), - ); - } - StateRootMessage::ProofCalculated { proof, sequence_number } => { - proofs_processed += 1; - trace!( - target: "engine::root", - sequence = sequence_number, - total_proofs = proofs_processed, - "Processing calculated proof" - ); - - if let Some(combined_proof) = self.on_proof(proof, sequence_number) { - if self.calculating_root { - current_multiproof.extend(combined_proof); - } else { - self.spawn_root_calculation(combined_proof); + match self.state_stream.rx.try_recv() { + Ok(update) => { + stats.updates_received += 1; + trace!( + target: "engine::root", + len = update.len(), + total_updates = stats.updates_received, + "Received new state update" + ); + Self::on_state_update( + self.config.consistent_view.clone(), + self.config.input.clone(), + update, + &mut self.state, + self.proof_sequencer.next_sequence(), + self.tx.clone(), + ); + } + Err(mpsc::TryRecvError::Empty) => { + // No state updates available, try to process internal messages + match self.rx.recv() { + Ok(message) => { + if let Some(result) = self.handle_internal_message( + message, + &mut stats, + &mut current_multiproof, + &mut trie_updates, + &mut current_root, + ) { + return result; } } + Err(_) => return Self::handle_internal_error(), } - StateRootMessage::RootCalculated { root, updates, elapsed } => { - roots_calculated += 1; - trace!( - target: "engine::root", - %root, - ?elapsed, - roots_calculated, - proofs = proofs_processed, - updates = updates_received, - "Computed intermediate root" - ); - current_root = root; - trie_updates.extend(updates); - self.calculating_root = false; - - let has_new_proofs = !current_multiproof.account_subtree.is_empty() || - !current_multiproof.storages.is_empty(); - let all_proofs_received = proofs_processed >= updates_received; - let no_pending = !self.proof_sequencer.has_pending(); - - trace!( - target: "engine::root", - has_new_proofs, - all_proofs_received, - no_pending, - "State check" - ); + } + Err(mpsc::TryRecvError::Disconnected) => { + trace!( + target: "engine::root", + updates = stats.updates_received, + proofs = stats.proofs_processed, + "State stream closed" + ); - // only spawn new calculation if we have accumulated new proofs - if has_new_proofs { - trace!( - target: "engine::root", - account_proofs = current_multiproof.account_subtree.len(), - storage_proofs = current_multiproof.storages.len(), - "Spawning subsequent root calculation" - ); - self.spawn_root_calculation(std::mem::take(&mut current_multiproof)); - } else if input_stream_closed && all_proofs_received && no_pending { - debug!( - target: "engine::root", - total_updates = updates_received, - total_proofs = proofs_processed, - roots_calculated, - "All proofs processed, ending calculation" - ); - return Ok((current_root, trie_updates)); - } + // Check if we can finish immediately + if !self.calculating_root && + !self.proof_sequencer.has_pending() && + stats.proofs_processed >= stats.updates_received + { + return Ok((current_root, trie_updates)); } - StateRootMessage::InputStreamClosed => { - trace!( - target: "engine::root", - updates = updates_received, - proofs = proofs_processed, - "Input state stream closed" - ); - input_stream_closed = true; - - // check if we can finish immediately - if !self.calculating_root && - !self.proof_sequencer.has_pending() && - proofs_processed >= updates_received - { - return Ok((current_root, trie_updates)); + + // Otherwise, continue processing remaining proofs + match self.rx.recv() { + Ok(message) => { + if let Some(result) = self.handle_internal_message( + message, + &mut stats, + &mut current_multiproof, + &mut trie_updates, + &mut current_root, + ) { + return result; + } } + Err(_) => return Self::handle_internal_error(), } - }, - Err(_) => { - // this means our internal message channel is closed, which shouldn't happen - // in normal operation since we hold both ends - error!( - target: "engine::root", - "Internal message channel closed unexpectedly" - ); - return Err(ParallelStateRootError::Other( - "Internal message channel closed unexpectedly".into(), - )); } } }