From 2ffe7c888facdbf4a3d6808c9b693024608f51be Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Wed, 20 Nov 2024 16:34:34 +0100 Subject: [PATCH] prevent state root calculation and returning final result without the ongoing proof calculations received --- crates/engine/tree/src/tree/root.rs | 60 ++++++++++++++++++----------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 6e6713a463223..300cb46bd37c2 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -116,6 +116,9 @@ pub(crate) struct StateRootTask { state: HashedPostState, /// Channels to retrieve proof calculation results from. pending_proofs: VecDeque>>, + /// Prevents triggering state root calculation and returning the final result + /// without all the ongoing proof calculations received. + pending_calculation: bool, } #[allow(dead_code)] @@ -125,7 +128,13 @@ where { /// Creates a new `StateRootTask`. pub(crate) fn new(config: StateRootConfig, state_stream: StdReceiverStream) -> Self { - Self { config, state_stream, state: Default::default(), pending_proofs: Default::default() } + Self { + config, + state_stream, + state: Default::default(), + pending_proofs: Default::default(), + pending_calculation: false, + } } /// Spawns the state root task and returns a handle to await its result. @@ -222,7 +231,7 @@ where } Err(mpsc::TryRecvError::Disconnected) => { // state stream closed, check if we can finish - if self.pending_proofs.is_empty() { + if self.pending_proofs.is_empty() && !self.pending_calculation { if let StateRootTaskState::Idle(_multiproof, state_root) = &task_state { return Ok((*state_root, trie_updates)); } @@ -237,6 +246,7 @@ where let multiproof = result?; task_state.add_proofs(multiproof); self.pending_proofs.pop_front(); + self.pending_calculation = true; continue; } Err(mpsc::TryRecvError::Empty) => { @@ -277,27 +287,31 @@ where } } StateRootTaskState::Idle(multiproof, _) => { - debug!(target: "engine::root", accounts_len = self.state.accounts.len(), "Spawning state root calculation from proofs task"); - let view = self.config.consistent_view.clone(); - let input_nodes_sorted = self.config.input.nodes.clone().into_sorted(); - let input_state_sorted = self.config.input.state.clone().into_sorted(); - let multiproof = std::mem::take(multiproof); - let state = self.state.clone(); - let (tx, rx) = mpsc::sync_channel(1); - - rayon::spawn(move || { - let result = calculate_state_root_from_proofs( - view, - &input_nodes_sorted, - &input_state_sorted, - multiproof, - state, - ); - let _ = tx.send(result); - }); - - task_state = StateRootTaskState::Pending(Default::default(), rx); - continue; + if self.pending_calculation { + debug!(target: "engine::root", accounts_len = self.state.accounts.len(), "Spawning state root calculation from proofs task"); + let view = self.config.consistent_view.clone(); + let input_nodes_sorted = self.config.input.nodes.clone().into_sorted(); + let input_state_sorted = self.config.input.state.clone().into_sorted(); + let multiproof = std::mem::take(multiproof); + let state = self.state.clone(); + let (tx, rx) = mpsc::sync_channel(1); + + rayon::spawn(move || { + let result = calculate_state_root_from_proofs( + view, + &input_nodes_sorted, + &input_state_sorted, + multiproof, + state, + ); + let _ = tx.send(result); + }); + + self.pending_calculation = false; + + task_state = StateRootTaskState::Pending(Default::default(), rx); + continue; + } } } }