From 3db84884e2776c53363911064ceb19858ae442bb Mon Sep 17 00:00:00 2001 From: Kolby Moroz Liebl <31669092+KolbyML@users.noreply.github.com> Date: Wed, 4 Dec 2024 08:21:07 -0700 Subject: [PATCH] feat: add state snapshot bridge (#1592) --- Cargo.lock | 28 +++ Cargo.toml | 2 + e2store/Cargo.toml | 1 + e2store/src/utils.rs | 16 +- portal-bridge/Cargo.toml | 1 + portal-bridge/README.md | 3 +- portal-bridge/src/bridge/state.rs | 224 +++++++++++++++++- portal-bridge/src/cli.rs | 6 + portal-bridge/src/main.rs | 1 + portal-bridge/src/types/mode.rs | 12 + trin-execution/Cargo.toml | 4 +- trin-execution/src/subcommands/era2/export.rs | 3 +- trin-execution/src/subcommands/era2/import.rs | 10 +- trin-execution/src/subcommands/era2/mod.rs | 1 + trin-execution/src/subcommands/era2/utils.rs | 93 ++++++++ trin-execution/src/trie_walker/db.rs | 10 +- 16 files changed, 397 insertions(+), 18 deletions(-) create mode 100644 trin-execution/src/subcommands/era2/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 961962f6d..8666a4693 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1952,6 +1952,7 @@ dependencies = [ "tokio", "tracing", "trin-utils", + "url", ] [[package]] @@ -2855,6 +2856,15 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humanize-duration" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "082247a9fa508369fe52b235aef8a8dbf931b08d223a1a9b70cd400a8a77ae9c" +dependencies = [ + "time", +] + [[package]] name = "humantime" version = "2.1.0" @@ -4420,6 +4430,7 @@ dependencies = [ "ethereum_ssz", "ethportal-api", "futures", + "humanize-duration", "itertools 0.13.0", "jsonrpsee", "portalnet", @@ -4899,10 +4910,12 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "windows-registry", ] @@ -6854,7 +6867,9 @@ dependencies = [ "e2store", "eth_trie", "ethportal-api", + "futures-util", "hashbrown 0.14.5", + "humanize-duration", "jsonrpsee", "lazy_static", "parking_lot 0.11.2", @@ -7424,6 +7439,19 @@ version = "0.2.97" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ee99da9c5ba11bd675621338ef6fa52296b76b83305e9b6e5c77d4c286d6d49" +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasm-timer" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index 72e6f56a5..5f822b6ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,7 +104,9 @@ ethereum_ssz = "0.7.1" ethereum_ssz_derive = "0.7.1" ethportal-api = { path = "ethportal-api" } futures = "0.3.23" +futures-util = "0.3.23" hex = "0.4.3" +humanize-duration = "0.0.6" itertools = "0.13.0" jsonrpsee = "0.24.4" keccak-hash = "0.10.0" diff --git a/e2store/Cargo.toml b/e2store/Cargo.toml index 29c00c199..8cce50bb3 100644 --- a/e2store/Cargo.toml +++ b/e2store/Cargo.toml @@ -25,6 +25,7 @@ tracing = { workspace = true, optional = true } trin-utils = { workspace = true, optional = true } scraper.workspace = true snap.workspace = true +url.workspace = true [dev-dependencies] rstest.workspace = true diff --git a/e2store/src/utils.rs b/e2store/src/utils.rs index b2a5afbfb..b062d649b 100644 --- a/e2store/src/utils.rs +++ b/e2store/src/utils.rs @@ -4,9 +4,11 @@ use anyhow::{ensure, Error}; use rand::{seq::SliceRandom, thread_rng}; use reqwest::Client; use scraper::{Html, Selector}; +use url::Url; const ERA_DIR_URL: &str = "https://mainnet.era.nimbus.team/"; const ERA1_DIR_URL: &str = "https://era1.ethportal.net/"; +const ERA2_DIR_URL: &str = "https://era2.ethportal.net/index.html"; pub const ERA1_FILE_COUNT: usize = 1897; pub fn underlying_io_error_kind(error: &Error) -> Option { @@ -18,7 +20,7 @@ pub fn underlying_io_error_kind(error: &Error) -> Option { None } -pub async fn download_era_links( +async fn download_era_links( http_client: &Client, url: &str, ) -> anyhow::Result> { @@ -38,7 +40,12 @@ pub async fn download_era_links( .expect("to be able to get epoch") .parse::() .expect("to be able to parse epoch"); - (epoch_index, format!("{url}{href}")) + let url = Url::parse(url) + .and_then(|url| url.join(href)) + .unwrap_or_else(|_| { + panic!("to construct valid url from base ({url}) and href ({href}).") + }); + (epoch_index, url.to_string()) }) .collect(); Ok(era_files) @@ -77,6 +84,11 @@ pub async fn get_era1_files(http_client: &Client) -> anyhow::Result anyhow::Result> { + let era2_files = download_era_links(http_client, ERA2_DIR_URL).await?; + Ok(era2_files) +} + /// Fetches era1 files hosted on era1.ethportal.net and shuffles them pub async fn get_shuffled_era1_files(http_client: &Client) -> anyhow::Result> { let era1_files = get_era1_files(http_client).await?; diff --git a/portal-bridge/Cargo.toml b/portal-bridge/Cargo.toml index 26a41161e..f73e09ab6 100644 --- a/portal-bridge/Cargo.toml +++ b/portal-bridge/Cargo.toml @@ -24,6 +24,7 @@ eth_trie.workspace = true ethereum_ssz.workspace = true ethportal-api.workspace = true futures.workspace = true +humanize-duration.workspace = true itertools.workspace = true jsonrpsee = { workspace = true, features = [ "async-client", diff --git a/portal-bridge/README.md b/portal-bridge/README.md index 300317b39..ded1764a9 100644 --- a/portal-bridge/README.md +++ b/portal-bridge/README.md @@ -44,7 +44,6 @@ cargo run -p portal-bridge -- --executable-path ./target/debug/trin --epoch-accu - before gossiping a individual piece of content, the bridge will perform a lookup to see if the content is already in the portal network. If it is, the content will not be gossiped. - `"--mode fourfours:single_hunter:10:50`: sample size = 10, threshold = 50 - same as the above hunter mode, but it will only gossip a single era1 file before exiting - #### Beacon Subnetwork - `"--mode latest"`: follow the head of the chain and gossip latest blocks @@ -54,6 +53,8 @@ cargo run -p portal-bridge -- --executable-path ./target/debug/trin --epoch-accu - `"--mode single:b100"`: backfill, always beginning from block #0 until the specified block (#100) - `"--mode single:r50-100"`: backfill, gossips state diffs for blocks in #50-#100 range (inclusive) +- `"--mode snapshot:1000000"`: gossips a state snapshot at the respective block, in this example the state snapshot at block 1,000,000 will be gossiped. This mode is only used for the State Network. + ### Subnetwork configuration diff --git a/portal-bridge/src/bridge/state.rs b/portal-bridge/src/bridge/state.rs index 54087a60b..acad77145 100644 --- a/portal-bridge/src/bridge/state.rs +++ b/portal-bridge/src/bridge/state.rs @@ -1,19 +1,27 @@ use std::{ + path::PathBuf, sync::{Arc, Mutex}, - time::Instant, + time::{Duration, Instant}, }; -use alloy::rlp::Decodable; -use eth_trie::{decode_node, node::Node, RootWithTrieDiff}; +use alloy::{consensus::EMPTY_ROOT_HASH, rlp::Decodable}; +use anyhow::ensure; +use e2store::utils::get_era2_files; +use eth_trie::{decode_node, node::Node, EthTrie, RootWithTrieDiff, Trie}; use ethportal_api::{ jsonrpsee::http_client::HttpClient, types::{ - network::Subnetwork, portal_wire::OfferTrace, - state_trie::account_state::AccountState as AccountStateInfo, + network::Subnetwork, portal_wire::OfferTrace, state_trie::account_state::AccountState, }, ContentValue, Enr, OverlayContentKey, StateContentKey, StateContentValue, StateNetworkApiClient, }; +use humanize_duration::{prelude::DurationExt, Truncate}; +use reqwest::{ + header::{HeaderMap, HeaderValue, CONTENT_TYPE}, + Client, +}; +use revm::{Database, DatabaseRef}; use revm_primitives::{keccak256, Bytecode, SpecId, B256}; use tokio::{ sync::{OwnedSemaphorePermit, Semaphore}, @@ -22,18 +30,27 @@ use tokio::{ use tracing::{debug, enabled, error, info, warn, Level}; use trin_evm::spec_id::get_spec_block_number; use trin_execution::{ + cli::{ImportStateConfig, APP_NAME}, config::StateConfig, content::{ create_account_content_key, create_account_content_value, create_contract_content_key, create_contract_content_value, create_storage_content_key, create_storage_content_value, }, execution::TrinExecution, + storage::{ + account_db::AccountDB, evm_db::EvmDB, execution_position::ExecutionPosition, + utils::setup_rocksdb, + }, + subcommands::era2::{ + import::StateImporter, + utils::{download_with_progress, percentage_from_address_hash}, + }, trie_walker::TrieWalker, types::{block_to_trace::BlockToTrace, trie_proof::TrieProof}, utils::full_nibble_path_to_address_hash, }; use trin_metrics::bridge::BridgeMetricsReporter; -use trin_utils::dir::create_temp_dir; +use trin_utils::dir::{create_temp_dir, setup_data_dir}; use crate::{ bridge::history::SERVE_BLOCK_TIMEOUT, @@ -55,6 +72,7 @@ pub struct StateBridge { global_offer_report: Arc>, // Bridge id used to determine which content keys to gossip bridge_id: BridgeId, + data_dir: Option, } impl StateBridge { @@ -64,6 +82,7 @@ impl StateBridge { offer_limit: usize, census: Census, bridge_id: BridgeId, + data_dir: Option, ) -> anyhow::Result { let metrics = BridgeMetricsReporter::new("state".to_string(), &format!("{mode:?}")); let offer_semaphore = Arc::new(Semaphore::new(offer_limit)); @@ -76,6 +95,7 @@ impl StateBridge { census, global_offer_report: Arc::new(Mutex::new(global_offer_report)), bridge_id, + data_dir, }) } @@ -85,7 +105,13 @@ impl StateBridge { // TODO: This should only gossip state trie at this block BridgeMode::Single(ModeType::Block(block)) => (0, block), BridgeMode::Single(ModeType::BlockRange(start_block, end_block)) => (start_block, end_block), - _ => panic!("State bridge only supports 'single' mode, for single block (implies 0..=block range) or range of blocks."), + BridgeMode::Snapshot(snapshot_block) => { + self.launch_snapshot(snapshot_block) + .await + .expect("State bridge failed"); + return; + }, + _ => panic!("State bridge only supports 'single' and `snapshot` mode, for single block (implies 0..=block range) or range of blocks, for snapshot mode provide the desired state snapshot to gossip"), }; if end_block > get_spec_block_number(SpecId::MERGE) { @@ -148,6 +174,88 @@ impl StateBridge { Ok(()) } + async fn launch_snapshot(&self, snapshot_block: u64) -> anyhow::Result<()> { + ensure!(snapshot_block > 0, "Snapshot block must be greater than 0"); + + // 1. Download the era2 file and import the state snapshot + let data_dir = setup_data_dir(APP_NAME, self.data_dir.clone(), false)?; + let next_block_number = { + let rocks_db = Arc::new(setup_rocksdb(&data_dir)?); + let execution_position = ExecutionPosition::initialize_from_db(rocks_db.clone())?; + execution_position.next_block_number() + }; + + if next_block_number == snapshot_block + 1 { + info!("State snapshot already imported. Skipping import."); + } else { + // Remove the existing database and create a new fresh folder + info!("Deleting existing database and importing state snapshot"); + std::fs::remove_dir_all(&data_dir) + .and_then(|_| std::fs::create_dir(&data_dir)) + .expect("Failed to delete existing database"); + + let http_client = Client::builder() + .default_headers(HeaderMap::from_iter([( + CONTENT_TYPE, + HeaderValue::from_static("application/xml"), + )])) + .build()?; + + let era2_files = get_era2_files(&http_client).await?; + let era2_blocks = era2_files.keys().cloned().collect::>(); + + ensure!( + era2_files.contains_key(&snapshot_block), + "Era2 file doesn't exist for requested snapshot block: try these {era2_blocks:?}" + ); + + info!( + "Downloading era2 file for snapshot block: {}", + snapshot_block + ); + + let path_to_era2 = data_dir.join(format!("era2-{snapshot_block}.bin")); + if let Err(e) = download_with_progress( + &http_client, + &era2_files[&snapshot_block], + path_to_era2.clone(), + ) + .await + { + return Err(anyhow::anyhow!("Failed to download era2 file: {e}")); + }; + + let import_state_config = ImportStateConfig { path_to_era2 }; + + let state_importer = StateImporter::new(import_state_config, &data_dir).await?; + let header = state_importer.import().await?; + info!( + "Imported state from era2: {} {}", + header.number, header.state_root, + ); + } + + info!("State snapshot imported successfully, starting state bridge"); + + // 2. Start the state bridge + + let rocks_db = Arc::new(setup_rocksdb(&data_dir)?); + + let execution_position = ExecutionPosition::initialize_from_db(rocks_db.clone())?; + ensure!( + execution_position.next_block_number() > 0, + "Trin execution not initialized!" + ); + + let mut evm_db = EvmDB::new(StateConfig::default(), rocks_db, &execution_position) + .expect("Failed to create EVM database"); + + self.gossip_whole_state_snapshot(&mut evm_db, execution_position) + .await + .expect("State bridge failed"); + Ok(()) + } + async fn gossip_trie_diff( &self, root_with_trie_diff: RootWithTrieDiff, @@ -183,7 +291,7 @@ impl StateBridge { let Node::Leaf(leaf) = decoded_node else { continue; }; - let account: AccountStateInfo = Decodable::decode(&mut leaf.value.as_slice())?; + let account: AccountState = Decodable::decode(&mut leaf.value.as_slice())?; // reconstruct the address hash from the path so that we can fetch the // address from the database @@ -238,6 +346,106 @@ impl StateBridge { Ok(()) } + async fn gossip_whole_state_snapshot( + &self, + evm_db: &mut EvmDB, + execution_position: ExecutionPosition, + ) -> anyhow::Result<()> { + let start = Instant::now(); + + let number = execution_position.next_block_number() - 1; + let block_hash = evm_db.block_hash(number)?; + + let mut leaf_count = 0; + + let root_hash = evm_db.trie.lock().root_hash()?; + let mut content_idx = 0; + let state_walker = TrieWalker::new(root_hash, evm_db.trie.lock().db.clone())?; + for account_proof in state_walker { + // gossip the account + self.gossip_account(&account_proof, block_hash, content_idx) + .await?; + content_idx += 1; + + let Some(encoded_last_node) = account_proof.proof.last() else { + panic!("Account proof is empty"); + }; + + let Node::Leaf(leaf) = decode_node(&mut encoded_last_node.as_ref())? else { + continue; + }; + + let account: AccountState = Decodable::decode(&mut leaf.value.as_slice())?; + + // reconstruct the address hash from the path so that we can fetch the + // address from the database + let mut partial_key_path = leaf.key.get_data().to_vec(); + partial_key_path.pop(); + let full_key_path = [&account_proof.path.clone(), partial_key_path.as_slice()].concat(); + let address_hash = full_nibble_path_to_address_hash(&full_key_path); + + leaf_count += 1; + if leaf_count % 100 == 0 { + let elapsed_secs = start.elapsed().as_secs_f64(); + let percentage_done = percentage_from_address_hash(address_hash); + + if percentage_done > 0.0 { + let estimated_total_time = elapsed_secs / (percentage_done / 100.0); + let eta_secs = Duration::from_secs_f64(estimated_total_time - elapsed_secs); + + info!( + "Processed {leaf_count} leaves, {:.2}% done, ETA: {}, last address_hash processed: {address_hash}", + percentage_done, eta_secs.human(Truncate::Second) + ); + } else { + info!( + "Processed {leaf_count} leaves, {:.2}% done, ETA: calculating..., last address_hash processed: {address_hash}", + percentage_done + ); + } + } + + // check contract code content key/value + if account.code_hash != keccak256([]) { + let code: Bytecode = evm_db.code_by_hash_ref(account.code_hash)?; + + self.gossip_contract_bytecode( + address_hash, + &account_proof, + block_hash, + account.code_hash, + code, + content_idx, + ) + .await?; + content_idx += 1; + } + + // check contract storage content key/value + if account.storage_root != EMPTY_ROOT_HASH { + let account_db = AccountDB::new(address_hash, evm_db.db.clone()); + let trie = EthTrie::from(Arc::new(account_db), account.storage_root)?.db; + + let storage_walker = TrieWalker::new(account.storage_root, trie)?; + for storage_proof in storage_walker { + self.gossip_storage( + &account_proof, + &storage_proof, + address_hash, + block_hash, + content_idx, + ) + .await?; + content_idx += 1; + } + } + } + + info!("Took {} seconds to complete", start.elapsed().as_secs()); + + Ok(()) + } + async fn gossip_account( &self, account_proof: &TrieProof, diff --git a/portal-bridge/src/cli.rs b/portal-bridge/src/cli.rs index b06771540..c5c6b58db 100644 --- a/portal-bridge/src/cli.rs +++ b/portal-bridge/src/cli.rs @@ -188,6 +188,12 @@ pub struct BridgeConfig { default_value = "1/1" )] pub bridge_id: BridgeId, + + #[arg( + long, + help = "The directory for storing trin-execution data, useful for storing state in non standard locations." + )] + pub data_dir: Option, } /// Used to identify the bridge amongst a set of bridges, diff --git a/portal-bridge/src/main.rs b/portal-bridge/src/main.rs index 5a4bb6b6a..120a48804 100644 --- a/portal-bridge/src/main.rs +++ b/portal-bridge/src/main.rs @@ -59,6 +59,7 @@ async fn main() -> Result<(), Box> { bridge_config.offer_limit, census, bridge_config.bridge_id, + bridge_config.data_dir, ) .await?; diff --git a/portal-bridge/src/types/mode.rs b/portal-bridge/src/types/mode.rs index 52a8ebb61..3e033a420 100644 --- a/portal-bridge/src/types/mode.rs +++ b/portal-bridge/src/types/mode.rs @@ -14,6 +14,8 @@ use trin_validation::constants::EPOCH_SIZE; /// - ex: "r10-12" backfills a block range from #10 to #12 (inclusive) /// - FourFours: gossips randomly sequenced era1 files /// - ex: "fourfours" +/// - Snapshot: gossips a State snapshot, this mode is only used for the state network +/// - ex: "snapshot:1000000" gossips the state snapshot at block 1000000 #[derive(Clone, Debug, PartialEq, Default, Eq)] pub enum BridgeMode { #[default] @@ -21,6 +23,7 @@ pub enum BridgeMode { FourFours(FourFoursMode), Backfill(ModeType), Single(ModeType), + Snapshot(u64), Test(PathBuf), } @@ -42,6 +45,9 @@ impl BridgeMode { BridgeMode::Test(_) => { return Err(anyhow!("BridgeMode `test` does not have a block range")) } + BridgeMode::Snapshot(_) => { + return Err(anyhow!("BridgeMode `snapshot` does not have a block range")) + } }; let (start, end) = match mode_type.clone() { ModeType::Epoch(epoch_number) => { @@ -96,6 +102,12 @@ impl FromStr for BridgeMode { let mode_type = ModeType::from_str(&val[1..])?; Ok(BridgeMode::Single(mode_type)) } + "snapshot" => { + let snapshot = val[1..] + .parse() + .map_err(|_| "Invalid snapshot arg: snapshot number")?; + Ok(BridgeMode::Snapshot(snapshot)) + } "test" => { let path = PathBuf::from_str(&val[1..]).map_err(|_| "Invalid test asset path")?; diff --git a/trin-execution/Cargo.toml b/trin-execution/Cargo.toml index 52a959c69..1989dd052 100644 --- a/trin-execution/Cargo.toml +++ b/trin-execution/Cargo.toml @@ -20,13 +20,15 @@ clap.workspace = true ethportal-api.workspace = true e2store.workspace = true eth_trie.workspace = true +futures-util.workspace = true hashbrown = "0.14.0" +humanize-duration.workspace = true jsonrpsee = { workspace = true, features = ["async-client", "client", "macros", "server"]} lazy_static.workspace = true parking_lot.workspace = true prometheus_exporter.workspace = true rayon = "1.10.0" -reqwest.workspace = true +reqwest = { workspace = true, features = ["stream"] } revm.workspace = true revm-inspectors = "0.8.1" revm-primitives.workspace = true diff --git a/trin-execution/src/subcommands/era2/export.rs b/trin-execution/src/subcommands/era2/export.rs index 090572023..cfabf25e6 100644 --- a/trin-execution/src/subcommands/era2/export.rs +++ b/trin-execution/src/subcommands/era2/export.rs @@ -22,6 +22,7 @@ use crate::{ account_db::AccountDB, evm_db::EvmDB, execution_position::ExecutionPosition, utils::setup_rocksdb, }, + subcommands::era2::utils::percentage_from_address_hash, }; pub struct StateExporter { @@ -123,7 +124,7 @@ impl StateExporter { accounts_exported += 1; if accounts_exported % 10000 == 0 { - info!("Processed {} accounts", accounts_exported); + info!("Processed {accounts_exported} leaves, {:.2}% done, last address_hash processed: {account_hash}", percentage_from_address_hash(account_hash)); } } diff --git a/trin-execution/src/subcommands/era2/import.rs b/trin-execution/src/subcommands/era2/import.rs index d08e11516..fd4b6b1e4 100644 --- a/trin-execution/src/subcommands/era2/import.rs +++ b/trin-execution/src/subcommands/era2/import.rs @@ -16,6 +16,7 @@ use crate::{ account_db::AccountDB, evm_db::EvmDB, execution_position::ExecutionPosition, utils::setup_rocksdb, }, + subcommands::era2::utils::percentage_from_address_hash, }; pub struct StateImporter { @@ -53,7 +54,7 @@ impl StateImporter { Ok(header) } - pub fn import_state(&self) -> anyhow::Result
{ + fn import_state(&self) -> anyhow::Result
{ info!("Importing state from .era2 file"); let mut era2 = Era2Reader::open(&self.config.path_to_era2)?; @@ -116,7 +117,8 @@ impl StateImporter { accounts_imported += 1; if accounts_imported % 1000 == 0 { - info!("Imported {} accounts", accounts_imported); + info!("Processed {accounts_imported} accounts, {:.2}% done, last address_hash processed: {address_hash}", percentage_from_address_hash(address_hash)); + info!("Committing changes to database"); self.evm_db.trie.lock().root_hash()?; info!("Finished committing changes to database"); @@ -135,10 +137,10 @@ impl StateImporter { } /// insert the last 256 block hashes into the database - pub async fn import_last_256_block_hashes(&self, block_number: u64) -> anyhow::Result<()> { + async fn import_last_256_block_hashes(&self, block_number: u64) -> anyhow::Result<()> { let first_block_hash_to_add = block_number.saturating_sub(BLOCKHASH_SERVE_WINDOW); let mut era_manager = EraManager::new(first_block_hash_to_add).await?; - while era_manager.next_block_number() < block_number { + while era_manager.next_block_number() <= block_number { let block = era_manager.get_next_block().await?; self.evm_db.db.put( keccak256(B256::from(U256::from(block.header.number))), diff --git a/trin-execution/src/subcommands/era2/mod.rs b/trin-execution/src/subcommands/era2/mod.rs index c687afe56..f8eee58cc 100644 --- a/trin-execution/src/subcommands/era2/mod.rs +++ b/trin-execution/src/subcommands/era2/mod.rs @@ -1,2 +1,3 @@ pub mod export; pub mod import; +pub mod utils; diff --git a/trin-execution/src/subcommands/era2/utils.rs b/trin-execution/src/subcommands/era2/utils.rs new file mode 100644 index 000000000..af94c1879 --- /dev/null +++ b/trin-execution/src/subcommands/era2/utils.rs @@ -0,0 +1,93 @@ +use std::{ + fs::File, + io::Write, + path::PathBuf, + time::{Duration, Instant}, +}; + +use alloy::primitives::B256; +use futures_util::StreamExt; +use humanize_duration::{prelude::DurationExt, Truncate}; +use reqwest::Client; +use tracing::{info, warn}; + +/// Returns the percentage of the address hash in the total address space. +/// we can assume that the address hashes are evenly distributed due to keccak256 +/// hash's properties +/// This should only be used when walking the trie linearly +pub fn percentage_from_address_hash(address_hash: B256) -> f64 { + let mut be_bytes = [0u8; 4]; + be_bytes.copy_from_slice(&address_hash.0[..4]); + let address_hash_bytes = u32::from_be_bytes(be_bytes); + address_hash_bytes as f64 / u32::MAX as f64 * 100.0 +} + +const MEGABYTE: f64 = 1024.0 * 1024.0; + +pub async fn download_with_progress( + http_client: &Client, + url: &str, + output_path: PathBuf, +) -> Result<(), Box> { + info!("Downloading from: {url}"); + + let response = http_client + .get(url) + .send() + .await + .or(Err(format!("Failed to GET from '{}'", &url)))?; + + // Get total size of the content + let total_size = response.content_length().unwrap_or(0); + if total_size == 0 { + warn!("Failed to get content length; progress will not be estimated."); + } + + let mut file = File::create(output_path.clone())?; + + let mut last_report_time = Instant::now(); + let mut amount_downloaded: u64 = 0; + let mut last_amount_downloaded: u64 = 0; + + // Stream the response body + let mut stream = response.bytes_stream(); + + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + file.write_all(&chunk)?; + amount_downloaded += chunk.len() as u64; + + // Check if enough time has passed since the last report + if last_report_time.elapsed() >= Duration::from_secs(2) { + let interval_last_report_time = last_report_time.elapsed().as_secs_f64(); + let interval_downloaded = amount_downloaded - last_amount_downloaded; + let current_speed = interval_downloaded as f64 / interval_last_report_time; // Bytes per second + + // Log progress with estimated time remaining + if total_size > 0 { + let remaining_bytes = total_size.saturating_sub(amount_downloaded) as f64; + let remaining_time = Duration::from_secs_f64(remaining_bytes / current_speed); + info!( + "amount_downloaded {:.2}/{:.2} MB | Speed: {:.2} MB/s | ETA: {}", + amount_downloaded as f64 / MEGABYTE, + total_size as f64 / MEGABYTE, + current_speed / MEGABYTE, + remaining_time.human(Truncate::Second) + ); + } else { + info!( + "amount_downloaded {:.2} MB | Speed: {:.2} MB/s", + amount_downloaded as f64 / MEGABYTE, + current_speed / MEGABYTE + ); + } + + // Update last report time + last_report_time = Instant::now(); + last_amount_downloaded = amount_downloaded; + } + } + + info!("Download complete: {:?}", output_path); + Ok(()) +} diff --git a/trin-execution/src/trie_walker/db.rs b/trin-execution/src/trie_walker/db.rs index dbd4fc184..5af388d25 100644 --- a/trin-execution/src/trie_walker/db.rs +++ b/trin-execution/src/trie_walker/db.rs @@ -3,7 +3,7 @@ use anyhow::anyhow; use eth_trie::DB; use hashbrown::HashMap; -use crate::storage::trie_db::TrieRocksDB; +use crate::storage::{account_db::AccountDB, trie_db::TrieRocksDB}; pub trait TrieWalkerDb { fn get(&self, key: &[u8]) -> anyhow::Result>; @@ -22,3 +22,11 @@ impl TrieWalkerDb for TrieRocksDB { .map_err(|err| anyhow!("Failed to read key value from TrieRocksDB {err}")) } } + +impl TrieWalkerDb for AccountDB { + fn get(&self, key: &[u8]) -> anyhow::Result> { + DB::get(self, key) + .map(|result| result.map(Bytes::from)) + .map_err(|err| anyhow!("Failed to read key value from TrieRocksDB {err}")) + } +}