Skip to content

Commit

Permalink
refactor: remove incoming state updates thread
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez committed Nov 25, 2024
1 parent 157258a commit 23a1c75
Showing 1 changed file with 151 additions and 141 deletions.
292 changes: 151 additions & 141 deletions crates/engine/tree/src/tree/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use std::{
mpsc::{self, Receiver, RecvError, Sender},
Arc,
},
thread,
time::{Duration, Instant},
};
use tracing::{debug, error, trace};
Expand Down Expand Up @@ -69,6 +68,7 @@ pub(crate) struct StateRootConfig<Factory> {
}

/// Wrapper for std channel receiver to maintain compatibility with `UnboundedReceiverStream`
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) struct StdReceiverStream {
rx: Receiver<EvmState>,
Expand All @@ -88,10 +88,6 @@ impl StdReceiverStream {
/// Messages that can be received by the state root task
#[derive(Debug)]
pub(crate) enum StateRootMessage {
/// New state update from transaction execution
StateUpdate(EvmState),
/// Input state stream has closed
InputStreamClosed,
/// Proof calculation completed for a specific state update
ProofCalculated {
/// The calculated proof
Expand Down Expand Up @@ -158,6 +154,14 @@ impl ProofSequencer {
}
}

/// Statistics tracking.
#[derive(Debug, Default)]
struct TaskStats {
updates_received: u64,
proofs_processed: u64,
roots_calculated: u64,
}

/// Standalone task that receives a transaction state stream and updates relevant
/// data structures to calculate state root.
///
Expand All @@ -180,8 +184,8 @@ pub(crate) struct StateRootTask<Factory> {
proof_sequencer: ProofSequencer,
/// Whether we're currently calculating a root
calculating_root: bool,
/// Background thread handle for input state stream processing
_stream_handler: Option<thread::JoinHandle<()>>,
/// Incoming state updates.
state_stream: StdReceiverStream,
}

#[allow(dead_code)]
Expand All @@ -193,38 +197,14 @@ where
pub(crate) fn new(config: StateRootConfig<Factory>, state_stream: StdReceiverStream) -> Self {
let (tx, rx) = mpsc::channel();

// spawn a background thread to forward state updates from StdReceiverStream
// to our internal message channel
let stream_tx = tx.clone();
let stream_handler = thread::Builder::new()
.name("State Stream Handler".to_string())
.spawn(move || {
loop {
match state_stream.recv() {
Ok(state) => {
if stream_tx.send(StateRootMessage::StateUpdate(state)).is_err() {
break;
}
}
Err(_) => {
// stream closed normally
debug!(target: "engine::root", "State stream closed normally");
let _ = stream_tx.send(StateRootMessage::InputStreamClosed);
break;
}
}
}
})
.expect("failed to spawn stream handler thread");

Self {
config,
rx,
tx,
state: Default::default(),
proof_sequencer: ProofSequencer::new(),
calculating_root: false,
_stream_handler: Some(stream_handler),
state_stream,
}
}

Expand Down Expand Up @@ -367,128 +347,158 @@ where
});
}

/// Handles internal messages (proofs and root calculations) and tracks state
fn handle_internal_message(
&mut self,
message: StateRootMessage,
stats: &mut TaskStats,
current_multiproof: &mut MultiProof,
trie_updates: &mut TrieUpdates,
current_root: &mut B256,
) -> Option<StateRootResult> {
match message {
StateRootMessage::ProofCalculated { proof, sequence_number } => {
stats.proofs_processed += 1;
trace!(
target: "engine::root",
sequence = sequence_number,
total_proofs = stats.proofs_processed,
"Processing calculated proof"
);

if let Some(combined_proof) = self.on_proof(proof, sequence_number) {
if self.calculating_root {
current_multiproof.extend(combined_proof);
} else {
self.spawn_root_calculation(combined_proof);
}
}
None
}
StateRootMessage::RootCalculated { root, updates, elapsed } => {
stats.roots_calculated += 1;
trace!(
target: "engine::root",
%root,
?elapsed,
roots_calculated = stats.roots_calculated,
proofs = stats.proofs_processed,
updates = stats.updates_received,
"Computed intermediate root"
);
*current_root = root;
trie_updates.extend(updates);
self.calculating_root = false;

let has_new_proofs = !current_multiproof.account_subtree.is_empty() ||
!current_multiproof.storages.is_empty();
let all_proofs_received = stats.proofs_processed >= stats.updates_received;
let no_pending = !self.proof_sequencer.has_pending();

if has_new_proofs {
trace!(
target: "engine::root",
account_proofs = current_multiproof.account_subtree.len(),
storage_proofs = current_multiproof.storages.len(),
"Spawning subsequent root calculation"
);
self.spawn_root_calculation(std::mem::take(current_multiproof));
None
} else if all_proofs_received && no_pending {
debug!(
target: "engine::root",
total_updates = stats.updates_received,
total_proofs = stats.proofs_processed,
roots_calculated = stats.roots_calculated,
"All proofs processed, ending calculation"
);
Some(Ok((*current_root, trie_updates.clone())))
} else {
None
}
}
}
}

/// Handle internal message channel errors
fn handle_internal_error() -> StateRootResult {
error!(target: "engine::root", "Internal message channel closed unexpectedly");
Err(ParallelStateRootError::Other("Internal message channel closed unexpectedly".into()))
}

fn run(mut self) -> StateRootResult {
let mut stats = TaskStats::default();
let mut current_multiproof = MultiProof::default();
let mut trie_updates = TrieUpdates::default();
let mut current_root = B256::default();
let mut updates_received = 0;
let mut proofs_processed = 0;
let mut roots_calculated = 0;
let mut input_stream_closed = false;

loop {
match self.rx.recv() {
Ok(message) => match message {
StateRootMessage::StateUpdate(update) => {
updates_received += 1;
trace!(
target: "engine::root",
len = update.len(),
total_updates = updates_received,
"Received new state update"
);
Self::on_state_update(
self.config.consistent_view.clone(),
self.config.input.clone(),
update,
&mut self.state,
self.proof_sequencer.next_sequence(),
self.tx.clone(),
);
}
StateRootMessage::ProofCalculated { proof, sequence_number } => {
proofs_processed += 1;
trace!(
target: "engine::root",
sequence = sequence_number,
total_proofs = proofs_processed,
"Processing calculated proof"
);

if let Some(combined_proof) = self.on_proof(proof, sequence_number) {
if self.calculating_root {
current_multiproof.extend(combined_proof);
} else {
self.spawn_root_calculation(combined_proof);
match self.state_stream.rx.try_recv() {
Ok(update) => {
stats.updates_received += 1;
trace!(
target: "engine::root",
len = update.len(),
total_updates = stats.updates_received,
"Received new state update"
);
Self::on_state_update(
self.config.consistent_view.clone(),
self.config.input.clone(),
update,
&mut self.state,
self.proof_sequencer.next_sequence(),
self.tx.clone(),
);
}
Err(mpsc::TryRecvError::Empty) => {
// No state updates available, try to process internal messages
match self.rx.recv() {
Ok(message) => {
if let Some(result) = self.handle_internal_message(
message,
&mut stats,
&mut current_multiproof,
&mut trie_updates,
&mut current_root,
) {
return result;
}
}
Err(_) => return Self::handle_internal_error(),
}
StateRootMessage::RootCalculated { root, updates, elapsed } => {
roots_calculated += 1;
trace!(
target: "engine::root",
%root,
?elapsed,
roots_calculated,
proofs = proofs_processed,
updates = updates_received,
"Computed intermediate root"
);
current_root = root;
trie_updates.extend(updates);
self.calculating_root = false;

let has_new_proofs = !current_multiproof.account_subtree.is_empty() ||
!current_multiproof.storages.is_empty();
let all_proofs_received = proofs_processed >= updates_received;
let no_pending = !self.proof_sequencer.has_pending();

trace!(
target: "engine::root",
has_new_proofs,
all_proofs_received,
no_pending,
"State check"
);
}
Err(mpsc::TryRecvError::Disconnected) => {
trace!(
target: "engine::root",
updates = stats.updates_received,
proofs = stats.proofs_processed,
"State stream closed"
);

// only spawn new calculation if we have accumulated new proofs
if has_new_proofs {
trace!(
target: "engine::root",
account_proofs = current_multiproof.account_subtree.len(),
storage_proofs = current_multiproof.storages.len(),
"Spawning subsequent root calculation"
);
self.spawn_root_calculation(std::mem::take(&mut current_multiproof));
} else if input_stream_closed && all_proofs_received && no_pending {
debug!(
target: "engine::root",
total_updates = updates_received,
total_proofs = proofs_processed,
roots_calculated,
"All proofs processed, ending calculation"
);
return Ok((current_root, trie_updates));
}
// Check if we can finish immediately
if !self.calculating_root &&
!self.proof_sequencer.has_pending() &&
stats.proofs_processed >= stats.updates_received
{
return Ok((current_root, trie_updates));
}
StateRootMessage::InputStreamClosed => {
trace!(
target: "engine::root",
updates = updates_received,
proofs = proofs_processed,
"Input state stream closed"
);
input_stream_closed = true;

// check if we can finish immediately
if !self.calculating_root &&
!self.proof_sequencer.has_pending() &&
proofs_processed >= updates_received
{
return Ok((current_root, trie_updates));

// Otherwise, continue processing remaining proofs
match self.rx.recv() {
Ok(message) => {
if let Some(result) = self.handle_internal_message(
message,
&mut stats,
&mut current_multiproof,
&mut trie_updates,
&mut current_root,
) {
return result;
}
}
Err(_) => return Self::handle_internal_error(),
}
},
Err(_) => {
// this means our internal message channel is closed, which shouldn't happen
// in normal operation since we hold both ends
error!(
target: "engine::root",
"Internal message channel closed unexpectedly"
);
return Err(ParallelStateRootError::Other(
"Internal message channel closed unexpectedly".into(),
));
}
}
}
Expand Down

0 comments on commit 23a1c75

Please sign in to comment.