diff --git a/Cargo.lock b/Cargo.lock index e73066ba60..de8b87acfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3318,6 +3318,7 @@ dependencies = [ "dojo-world", "jsonrpsee 0.16.3", "katana-core", + "katana-executor", "katana-primitives", "katana-rpc", "katana-rpc-api", @@ -6534,6 +6535,7 @@ dependencies = [ "common", "console", "katana-core", + "katana-executor", "katana-primitives", "katana-rpc", "katana-rpc-api", @@ -6578,6 +6580,7 @@ dependencies = [ "cairo-lang-starknet", "cairo-vm", "convert_case 0.6.0", + "derive_more", "ethers", "flate2", "futures", @@ -6586,6 +6589,7 @@ dependencies = [ "katana-executor", "katana-primitives", "katana-provider", + "katana-tasks", "lazy_static", "parking_lot 0.12.1", "primitive-types", @@ -6627,7 +6631,6 @@ dependencies = [ name = "katana-executor" version = "0.6.0-alpha.5" dependencies = [ - "anyhow", "blockifier", "cairo-vm", "convert_case 0.6.0", @@ -6641,7 +6644,6 @@ dependencies = [ "starknet 0.9.0", "starknet_api", "thiserror", - "tokio", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index e1f4691201..9527e605f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ members = [ "crates/katana/storage/codecs/derive", "crates/katana/storage/db", "crates/katana/storage/provider", + "crates/katana/tasks", "crates/metrics", "crates/saya/core", "crates/sozo/signers", @@ -68,9 +69,9 @@ dojo-world = { path = "crates/dojo-world" } # katana katana-codecs = { path = "crates/katana/storage/codecs" } katana-codecs-derive = { path = "crates/katana/storage/codecs/derive" } -katana-core = { path = "crates/katana/core" } +katana-core = { path = "crates/katana/core", default-features = false } katana-db = { path = "crates/katana/storage/db" } -katana-executor = { path = "crates/katana/executor" } +katana-executor = { path = "crates/katana/executor", default-features = false } katana-primitives = { path = "crates/katana/primitives" } katana-provider = { path = "crates/katana/storage/provider" } katana-rpc = { path = "crates/katana/rpc/rpc" } @@ -78,6 +79,7 @@ katana-rpc-api = { path = "crates/katana/rpc/rpc-api" } katana-rpc-types = { path = "crates/katana/rpc/rpc-types" } katana-rpc-types-builder = { path = "crates/katana/rpc/rpc-types-builder" } katana-runner = { path = "crates/katana/runner" } +katana-tasks = { path = "crates/katana/tasks" } # torii torii-client = { path = "crates/torii/client" } diff --git a/bin/katana/Cargo.toml b/bin/katana/Cargo.toml index 59e26a9617..b208bb03d5 100644 --- a/bin/katana/Cargo.toml +++ b/bin/katana/Cargo.toml @@ -13,6 +13,7 @@ clap_complete.workspace = true common.workspace = true console.workspace = true katana-core.workspace = true +katana-executor.workspace = true katana-primitives.workspace = true katana-rpc-api.workspace = true katana-rpc.workspace = true diff --git a/bin/katana/src/main.rs b/bin/katana/src/main.rs index 506c91d014..1c20a4c0f2 100644 --- a/bin/katana/src/main.rs +++ b/bin/katana/src/main.rs @@ -5,9 +5,13 @@ use std::sync::Arc; use clap::{CommandFactory, Parser}; use clap_complete::{generate, Shell}; use console::Style; +use katana_core::constants::MAX_RECURSION_DEPTH; +use katana_core::env::get_default_vm_resource_fee_cost; use katana_core::sequencer::KatanaSequencer; +use katana_executor::SimulationFlag; use katana_primitives::class::ClassHash; use katana_primitives::contract::ContractAddress; +use katana_primitives::env::{CfgEnv, FeeTokenAddressses}; use katana_primitives::genesis::allocation::GenesisAccountAlloc; use katana_primitives::genesis::Genesis; use katana_rpc::{spawn, NodeHandle}; @@ -39,7 +43,29 @@ async fn main() -> Result<(), Box> { let sequencer_config = args.sequencer_config(); let starknet_config = args.starknet_config(); - let sequencer = Arc::new(KatanaSequencer::new(sequencer_config, starknet_config).await?); + let cfg_env = CfgEnv { + chain_id: starknet_config.env.chain_id, + vm_resource_fee_cost: get_default_vm_resource_fee_cost(), + invoke_tx_max_n_steps: starknet_config.env.invoke_max_steps, + validate_max_n_steps: starknet_config.env.validate_max_steps, + max_recursion_depth: MAX_RECURSION_DEPTH, + fee_token_addresses: FeeTokenAddressses { + eth: starknet_config.genesis.fee_token.address, + strk: Default::default(), + }, + }; + + let simulation_flags = SimulationFlag { + skip_validate: starknet_config.disable_validate, + skip_fee_transfer: starknet_config.disable_fee, + ..Default::default() + }; + + use katana_executor::implementation::blockifier::BlockifierFactory; + let executor_factory = BlockifierFactory::new(cfg_env, simulation_flags); + + let sequencer = + Arc::new(KatanaSequencer::new(executor_factory, sequencer_config, starknet_config).await?); let NodeHandle { addr, handle, .. } = spawn(Arc::clone(&sequencer), server_config).await?; if !args.silent { diff --git a/crates/dojo-test-utils/Cargo.toml b/crates/dojo-test-utils/Cargo.toml index a69649fd02..8cdbb727bf 100644 --- a/crates/dojo-test-utils/Cargo.toml +++ b/crates/dojo-test-utils/Cargo.toml @@ -18,6 +18,7 @@ dojo-lang = { path = "../dojo-lang" } dojo-world = { path = "../dojo-world", features = [ "manifest", "migration" ] } jsonrpsee = { version = "0.16.2", features = [ "server" ] } katana-core = { path = "../katana/core" } +katana-executor = { workspace = true, features = [ "blockifier" ] } katana-primitives = { path = "../katana/primitives" } katana-rpc = { path = "../katana/rpc/rpc" } katana-rpc-api = { path = "../katana/rpc/rpc-api" } diff --git a/crates/dojo-test-utils/src/sequencer.rs b/crates/dojo-test-utils/src/sequencer.rs index 9e006c0e59..8131635541 100644 --- a/crates/dojo-test-utils/src/sequencer.rs +++ b/crates/dojo-test-utils/src/sequencer.rs @@ -2,9 +2,14 @@ use std::sync::Arc; use jsonrpsee::core::Error; pub use katana_core::backend::config::{Environment, StarknetConfig}; +use katana_core::constants::MAX_RECURSION_DEPTH; +use katana_core::env::get_default_vm_resource_fee_cost; use katana_core::sequencer::KatanaSequencer; pub use katana_core::sequencer::SequencerConfig; +use katana_executor::implementation::blockifier::BlockifierFactory; +use katana_executor::SimulationFlag; use katana_primitives::chain::ChainId; +use katana_primitives::env::{CfgEnv, FeeTokenAddressses}; use katana_rpc::config::ServerConfig; use katana_rpc::{spawn, NodeHandle}; use katana_rpc_api::ApiKind; @@ -26,13 +31,33 @@ pub struct TestSequencer { url: Url, handle: NodeHandle, account: TestAccount, - pub sequencer: Arc, + pub sequencer: Arc>, } impl TestSequencer { pub async fn start(config: SequencerConfig, starknet_config: StarknetConfig) -> Self { + let cfg_env = CfgEnv { + chain_id: starknet_config.env.chain_id, + vm_resource_fee_cost: get_default_vm_resource_fee_cost(), + invoke_tx_max_n_steps: starknet_config.env.invoke_max_steps, + validate_max_n_steps: starknet_config.env.validate_max_steps, + max_recursion_depth: MAX_RECURSION_DEPTH, + fee_token_addresses: FeeTokenAddressses { + eth: starknet_config.genesis.fee_token.address, + strk: Default::default(), + }, + }; + + let simulation_flags = SimulationFlag { + skip_validate: starknet_config.disable_validate, + skip_fee_transfer: starknet_config.disable_fee, + ..Default::default() + }; + + let executor_factory = BlockifierFactory::new(cfg_env, simulation_flags); + let sequencer = Arc::new( - KatanaSequencer::new(config, starknet_config) + KatanaSequencer::new(executor_factory, config, starknet_config) .await .expect("Failed to create sequencer"), ); diff --git a/crates/katana/core/Cargo.toml b/crates/katana/core/Cargo.toml index 7b084a45ac..2954843225 100644 --- a/crates/katana/core/Cargo.toml +++ b/crates/katana/core/Cargo.toml @@ -8,9 +8,10 @@ version.workspace = true [dependencies] katana-db = { path = "../storage/db" } -katana-executor = { path = "../executor" } +katana-executor.workspace = true katana-primitives = { path = "../primitives" } katana-provider = { path = "../storage/provider" } +katana-tasks.workspace = true anyhow.workspace = true async-trait.workspace = true @@ -19,7 +20,8 @@ cairo-lang-casm = "2.3.1" cairo-lang-starknet = "2.3.1" cairo-vm.workspace = true convert_case.workspace = true -ethers = "2.0.11" +derive_more.workspace = true +ethers = { version = "2.0.11", optional = true } flate2.workspace = true futures.workspace = true lazy_static = "1.4.0" @@ -42,5 +44,5 @@ hex = "0.4.3" tempfile = "3.8.1" [features] -messaging = [ ] +messaging = [ "ethers" ] starknet-messaging = [ ] diff --git a/crates/katana/core/src/backend/mod.rs b/crates/katana/core/src/backend/mod.rs index 02931e9440..96a37d0202 100644 --- a/crates/katana/core/src/backend/mod.rs +++ b/crates/katana/core/src/backend/mod.rs @@ -1,10 +1,11 @@ use std::sync::Arc; +use katana_executor::ExecutorFactory; use katana_primitives::block::{ Block, FinalityStatus, GasPrices, Header, PartialHeader, SealedBlockWithStatus, }; use katana_primitives::chain::ChainId; -use katana_primitives::env::{BlockEnv, CfgEnv, FeeTokenAddressses}; +use katana_primitives::env::BlockEnv; use katana_primitives::receipt::Receipt; use katana_primitives::state::StateUpdatesWithDeclaredClasses; use katana_primitives::transaction::TxWithHash; @@ -13,7 +14,6 @@ use katana_primitives::FieldElement; use katana_provider::providers::fork::ForkedProvider; use katana_provider::providers::in_memory::InMemoryProvider; use katana_provider::traits::block::{BlockHashProvider, BlockWriter}; -use katana_provider::traits::state::{StateFactoryProvider, StateProvider}; use parking_lot::RwLock; use starknet::core::types::{BlockId, BlockStatus, MaybePendingBlockWithTxHashes}; use starknet::core::utils::parse_cairo_short_string; @@ -27,12 +27,11 @@ pub mod storage; use self::config::StarknetConfig; use self::storage::Blockchain; -use crate::constants::MAX_RECURSION_DEPTH; -use crate::env::{get_default_vm_resource_fee_cost, BlockContextGenerator}; +use crate::env::BlockContextGenerator; use crate::service::block_producer::{BlockProductionError, MinedBlockOutcome}; use crate::utils::get_current_timestamp; -pub struct Backend { +pub struct Backend { /// The config used to generate the backend. pub config: StarknetConfig, /// stores all block related data in memory @@ -41,15 +40,15 @@ pub struct Backend { pub chain_id: ChainId, /// The block context generator. pub block_context_generator: RwLock, + + pub executor_factory: Arc, } -impl Backend { - pub async fn new(mut config: StarknetConfig) -> Self { +impl Backend { + pub async fn new(executor_factory: Arc, mut config: StarknetConfig) -> Self { let block_context_generator = config.block_context_generator(); - let (blockchain, chain_id): (Blockchain, ChainId) = if let Some(forked_url) = - &config.fork_rpc_url - { + let blockchain: Blockchain = if let Some(forked_url) = &config.fork_rpc_url { let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(forked_url.clone()))); let forked_chain_id = provider.chain_id().await.unwrap(); @@ -99,43 +98,25 @@ impl Backend { ) .expect("able to create forked blockchain"); - (blockchain, forked_chain_id.into()) + config.env.chain_id = forked_chain_id.into(); + blockchain } else if let Some(db_path) = &config.db_dir { - ( - Blockchain::new_with_db(db_path, &config.genesis) - .expect("able to create blockchain from db"), - config.env.chain_id, - ) + Blockchain::new_with_db(db_path, &config.genesis) + .expect("able to create blockchain from db") } else { - let blockchain = Blockchain::new_with_genesis(InMemoryProvider::new(), &config.genesis) - .expect("able to create blockchain from genesis block"); - - (blockchain, config.env.chain_id) + Blockchain::new_with_genesis(InMemoryProvider::new(), &config.genesis) + .expect("able to create blockchain from genesis block") }; Self { - chain_id, + chain_id: config.env.chain_id, blockchain, config, + executor_factory, block_context_generator: RwLock::new(block_context_generator), } } - /// Mines a new block based on the provided execution outcome. - /// This method should only be called by the - /// [IntervalBlockProducer](crate::service::block_producer::IntervalBlockProducer) when the node - /// is running in `interval` mining mode. - pub fn mine_pending_block( - &self, - block_env: &BlockEnv, - tx_receipt_pairs: Vec<(TxWithHash, Receipt)>, - state_updates: StateUpdatesWithDeclaredClasses, - ) -> Result<(MinedBlockOutcome, Box), BlockProductionError> { - let outcome = self.do_mine_block(block_env, tx_receipt_pairs, state_updates)?; - let new_state = StateFactoryProvider::latest(&self.blockchain.provider())?; - Ok((outcome, new_state)) - } - pub fn do_mine_block( &self, block_env: &BlockEnv, @@ -145,9 +126,11 @@ impl Backend { let (txs, receipts): (Vec, Vec) = tx_receipt_pairs.into_iter().unzip(); let prev_hash = BlockHashProvider::latest_hash(self.blockchain.provider())?; + let block_number = block_env.number; + let tx_count = txs.len(); let partial_header = PartialHeader { - number: block_env.number, + number: block_number, parent_hash: prev_hash, version: CURRENT_STARKNET_VERSION, timestamp: block_env.timestamp, @@ -158,10 +141,7 @@ impl Backend { }, }; - let tx_count = txs.len(); - let block_number = block_env.number; - - let header = Header::new(partial_header, block_number, FieldElement::ZERO); + let header = Header::new(partial_header, FieldElement::ZERO); let block = Block { header, body: txs }.seal(); let block = SealedBlockWithStatus { block, status: FinalityStatus::AcceptedOnL2 }; @@ -195,21 +175,6 @@ impl Backend { block_env.l1_gas_prices = self.config.env.gas_price.clone(); } - /// Retrieves the chain configuration environment values. - pub(crate) fn chain_cfg_env(&self) -> CfgEnv { - CfgEnv { - chain_id: self.chain_id, - vm_resource_fee_cost: get_default_vm_resource_fee_cost(), - invoke_tx_max_n_steps: self.config.env.invoke_max_steps, - validate_max_n_steps: self.config.env.validate_max_steps, - max_recursion_depth: MAX_RECURSION_DEPTH, - fee_token_addresses: FeeTokenAddressses { - eth: self.config.genesis.fee_token.address, - strk: Default::default(), - }, - } - } - pub fn mine_empty_block( &self, block_env: &BlockEnv, @@ -221,6 +186,9 @@ impl Backend { #[cfg(test)] mod tests { + use std::sync::Arc; + + use katana_executor::implementation::noop::NoopExecutorFactory; use katana_primitives::genesis::Genesis; use katana_provider::traits::block::{BlockNumberProvider, BlockProvider}; use katana_provider::traits::env::BlockEnvProvider; @@ -237,8 +205,8 @@ mod tests { } } - async fn create_test_backend() -> Backend { - Backend::new(create_test_starknet_config()).await + async fn create_test_backend() -> Backend { + Backend::new(Arc::new(NoopExecutorFactory::default()), create_test_starknet_config()).await } #[tokio::test] diff --git a/crates/katana/core/src/sequencer.rs b/crates/katana/core/src/sequencer.rs index 7ed57474c2..8e063604cb 100644 --- a/crates/katana/core/src/sequencer.rs +++ b/crates/katana/core/src/sequencer.rs @@ -4,16 +4,12 @@ use std::slice::Iter; use std::sync::Arc; use anyhow::Result; -use blockifier::block_context::BlockContext; -use blockifier::execution::errors::{EntryPointExecutionError, PreExecutionError}; -use blockifier::transaction::errors::TransactionExecutionError; -use katana_executor::blockifier::state::StateRefDb; -use katana_executor::blockifier::utils::{block_context_from_envs, EntryPointCall}; -use katana_executor::blockifier::PendingState; +use katana_executor::{ExecutorFactory, SimulationFlag}; use katana_primitives::block::{BlockHash, BlockHashOrNumber, BlockIdOrTag, BlockNumber}; use katana_primitives::chain::ChainId; use katana_primitives::class::{ClassHash, CompiledClass}; use katana_primitives::contract::{ContractAddress, Nonce, StorageKey, StorageValue}; +use katana_primitives::env::BlockEnv; use katana_primitives::event::{ContinuationToken, ContinuationTokenError}; use katana_primitives::receipt::Event; use katana_primitives::transaction::{ExecutableTxWithHash, TxHash, TxWithHash}; @@ -27,14 +23,14 @@ use katana_provider::traits::state::{StateFactoryProvider, StateProvider}; use katana_provider::traits::transaction::{ ReceiptProvider, TransactionProvider, TransactionsProviderExt, }; -use starknet::core::types::{BlockTag, EmittedEvent, EventsPage, FeeEstimate}; +use starknet::core::types::{BlockTag, EmittedEvent, EventsPage, FeeEstimate, PriceUnit}; use crate::backend::config::StarknetConfig; use crate::backend::contract::StarknetContract; use crate::backend::Backend; use crate::pool::TransactionPool; use crate::sequencer_error::SequencerError; -use crate::service::block_producer::{BlockProducer, BlockProducerMode}; +use crate::service::block_producer::{BlockProducer, BlockProducerMode, PendingExecutor}; #[cfg(feature = "messaging")] use crate::service::messaging::MessagingConfig; #[cfg(feature = "messaging")] @@ -51,39 +47,30 @@ pub struct SequencerConfig { pub messaging: Option, } -pub struct KatanaSequencer { +pub struct KatanaSequencer { pub config: SequencerConfig, pub pool: Arc, - pub backend: Arc, - pub block_producer: BlockProducer, + pub backend: Arc>, + pub block_producer: Arc>, } -impl KatanaSequencer { +impl KatanaSequencer { pub async fn new( + executor_factory: EF, config: SequencerConfig, starknet_config: StarknetConfig, ) -> anyhow::Result { - let backend = Arc::new(Backend::new(starknet_config).await); + let executor_factory = Arc::new(executor_factory); + let backend = Arc::new(Backend::new(executor_factory.clone(), starknet_config).await); let pool = Arc::new(TransactionPool::new()); let miner = TransactionMiner::new(pool.add_listener()); - let state = StateFactoryProvider::latest(backend.blockchain.provider()) - .map(StateRefDb::new) - .unwrap(); - let block_producer = if config.block_time.is_some() || config.no_mining { - let block_num = backend.blockchain.provider().latest_number()?; - - let mut block_env = - backend.blockchain.provider().block_env_at(block_num.into())?.unwrap(); - backend.update_block_env(&mut block_env); - let cfg_env = backend.chain_cfg_env(); - if let Some(interval) = config.block_time { - BlockProducer::interval(Arc::clone(&backend), state, interval, (block_env, cfg_env)) + BlockProducer::interval(Arc::clone(&backend), interval) } else { - BlockProducer::on_demand(Arc::clone(&backend), state, (block_env, cfg_env)) + BlockProducer::on_demand(Arc::clone(&backend)) } } else { BlockProducer::instant(Arc::clone(&backend)) @@ -96,6 +83,8 @@ impl KatanaSequencer { None }; + let block_producer = Arc::new(block_producer); + tokio::spawn(NodeService { miner, pool: Arc::clone(&pool), @@ -108,51 +97,49 @@ impl KatanaSequencer { } /// Returns the pending state if the sequencer is running in _interval_ mode. Otherwise `None`. - pub fn pending_state(&self) -> Option> { + pub fn pending_executor(&self) -> Option { match &*self.block_producer.inner.read() { BlockProducerMode::Instant(_) => None, - BlockProducerMode::Interval(producer) => Some(producer.state()), + BlockProducerMode::Interval(producer) => Some(producer.executor()), } } - pub fn block_producer(&self) -> &BlockProducer { + pub fn block_producer(&self) -> &BlockProducer { &self.block_producer } - pub fn backend(&self) -> &Backend { + pub fn backend(&self) -> &Backend { &self.backend } - pub fn block_execution_context_at( - &self, - block_id: BlockIdOrTag, - ) -> SequencerResult> { + pub fn block_env_at(&self, block_id: BlockIdOrTag) -> SequencerResult> { let provider = self.backend.blockchain.provider(); - let cfg_env = self.backend().chain_cfg_env(); - if let BlockIdOrTag::Tag(BlockTag::Pending) = block_id { - if let Some(state) = self.pending_state() { - let (block_env, _) = state.block_execution_envs(); - return Ok(Some(block_context_from_envs(&block_env, &cfg_env))); + if BlockIdOrTag::Tag(BlockTag::Pending) == block_id { + if let Some(exec) = self.pending_executor() { + return Ok(Some(exec.read().block_env())); } } - let block_num = match block_id { + match block_id { BlockIdOrTag::Tag(BlockTag::Pending) | BlockIdOrTag::Tag(BlockTag::Latest) => { - provider.latest_number()? + let num = provider.latest_number()?; + provider + .block_env_at(num.into())? + .map(Some) + .ok_or(SequencerError::BlockNotFound(block_id)) } BlockIdOrTag::Hash(hash) => provider - .block_number_by_hash(hash)? - .ok_or(SequencerError::BlockNotFound(block_id))?, - - BlockIdOrTag::Number(num) => num, - }; - - provider - .block_env_at(block_num.into())? - .map(|block_env| Some(block_context_from_envs(&block_env, &cfg_env))) - .ok_or(SequencerError::BlockNotFound(block_id)) + .block_env_at(hash.into())? + .map(Some) + .ok_or(SequencerError::BlockNotFound(block_id)), + + BlockIdOrTag::Number(num) => provider + .block_env_at(num.into())? + .map(Some) + .ok_or(SequencerError::BlockNotFound(block_id)), + } } pub fn state(&self, block_id: &BlockIdOrTag) -> SequencerResult> { @@ -165,8 +152,8 @@ impl KatanaSequencer { } BlockIdOrTag::Tag(BlockTag::Pending) => { - if let Some(state) = self.pending_state() { - Ok(Box::new(state.state.clone())) + if let Some(exec) = self.pending_executor() { + Ok(Box::new(exec.read().state())) } else { let state = StateFactoryProvider::latest(provider)?; Ok(state) @@ -196,22 +183,32 @@ impl KatanaSequencer { skip_validate: bool, ) -> SequencerResult> { let state = self.state(&block_id)?; - - let block_context = self - .block_execution_context_at(block_id)? - .ok_or_else(|| SequencerError::BlockNotFound(block_id))?; + let env = self.block_env_at(block_id)?.ok_or(SequencerError::BlockNotFound(block_id))?; + let executor = self.backend.executor_factory.with_state_and_block_env(state, env); // If the node is run with transaction validation disabled, then we should not validate // transactions when estimating the fee even if the `SKIP_VALIDATE` flag is not set. let should_validate = !(skip_validate || self.backend.config.disable_validate); + let simulation_flag = + SimulationFlag { skip_validate: !should_validate, ..Default::default() }; + + let mut estimates: Vec = Vec::with_capacity(transactions.len()); + for tx in transactions { + let result = executor.simulate(tx, simulation_flag.clone()).unwrap(); + + let overall_fee = result.actual_fee().into(); + let gas_consumed = result.gas_used().into(); + let gas_price = executor.block_env().l1_gas_prices.eth.into(); + + estimates.push(FeeEstimate { + gas_consumed, + gas_price, + overall_fee, + unit: PriceUnit::Wei, + }) + } - katana_executor::blockifier::utils::estimate_fee( - transactions.into_iter(), - block_context, - state, - should_validate, - ) - .map_err(SequencerError::TransactionExecution) + Ok(estimates) } pub fn block_hash_and_number(&self) -> SequencerResult<(BlockHash, BlockNumber)> { @@ -282,8 +279,9 @@ impl KatanaSequencer { let provider = self.backend.blockchain.provider(); let count = match block_id { - BlockIdOrTag::Tag(BlockTag::Pending) => match self.pending_state() { - Some(state) => Some(state.executed_txs.read().len() as u64), + BlockIdOrTag::Tag(BlockTag::Pending) => match self.pending_executor() { + Some(exec) => Some(exec.read().transactions().len() as u64), + None => { let hash = BlockHashProvider::latest_hash(provider)?; TransactionProvider::transaction_count_by_block(provider, hash.into())? @@ -317,40 +315,14 @@ impl KatanaSequencer { Ok(nonce) } - pub fn call( - &self, - request: EntryPointCall, - block_id: BlockIdOrTag, - ) -> SequencerResult> { - let state = self.state(&block_id)?; - - let block_context = self - .block_execution_context_at(block_id)? - .ok_or_else(|| SequencerError::BlockNotFound(block_id))?; - - let retdata = katana_executor::blockifier::utils::call(request, block_context, state) - .map_err(|e| match e { - TransactionExecutionError::ExecutionError(exe) => match exe { - EntryPointExecutionError::PreExecutionError( - PreExecutionError::UninitializedStorageAddress(addr), - ) => SequencerError::ContractNotFound(addr.into()), - _ => SequencerError::EntryPointExecution(exe), - }, - _ => SequencerError::TransactionExecution(e), - })?; - - Ok(retdata) - } - pub fn transaction(&self, hash: &TxHash) -> SequencerResult> { let tx = TransactionProvider::transaction_by_hash(self.backend.blockchain.provider(), *hash)?; let tx @ Some(_) = tx else { - return Ok(self.pending_state().as_ref().and_then(|state| { - state - .executed_txs - .read() + return Ok(self.pending_executor().as_ref().and_then(|exec| { + exec.read() + .transactions() .iter() .find_map(|tx| if tx.0.hash == *hash { Some(tx.0.clone()) } else { None }) })); @@ -491,8 +463,8 @@ impl KatanaSequencer { } pub fn has_pending_transactions(&self) -> bool { - if let Some(ref pending) = self.pending_state() { - !pending.executed_txs.read().is_empty() + if let Some(ref exec) = self.pending_executor() { + !exec.read().transactions().is_empty() } else { false } @@ -560,6 +532,7 @@ fn filter_events_by_params( #[cfg(test)] mod tests { + use katana_executor::implementation::noop::NoopExecutorFactory; use katana_provider::traits::block::BlockNumberProvider; use super::{KatanaSequencer, SequencerConfig}; @@ -567,7 +540,10 @@ mod tests { #[tokio::test] async fn init_interval_block_producer_with_correct_block_env() { + let executor_factory = NoopExecutorFactory::default(); + let sequencer = KatanaSequencer::new( + executor_factory, SequencerConfig { no_mining: true, ..Default::default() }, StarknetConfig::default(), ) @@ -577,7 +553,7 @@ mod tests { let provider = sequencer.backend.blockchain.provider(); let latest_num = provider.latest_number().unwrap(); - let producer_block_env = sequencer.pending_state().unwrap().block_execution_envs().0; + let producer_block_env = sequencer.pending_executor().unwrap().read().block_env(); assert_eq!( producer_block_env.number, diff --git a/crates/katana/core/src/service/block_producer.rs b/crates/katana/core/src/service/block_producer.rs index 84e526d1c4..f04644f774 100644 --- a/crates/katana/core/src/service/block_producer.rs +++ b/crates/katana/core/src/service/block_producer.rs @@ -8,24 +8,19 @@ use std::time::Duration; use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::stream::{Stream, StreamExt}; use futures::FutureExt; -use katana_executor::blockifier::outcome::TxReceiptWithExecInfo; -use katana_executor::blockifier::state::{CachedStateWrapper, StateRefDb}; -use katana_executor::blockifier::utils::{ - block_context_from_envs, get_state_update_from_cached_state, -}; -use katana_executor::blockifier::{PendingState, TransactionExecutor}; -use katana_primitives::block::BlockHashOrNumber; -use katana_primitives::env::{BlockEnv, CfgEnv}; +use katana_executor::{BlockExecutor, ExecutionOutput, ExecutorFactory}; +use katana_primitives::block::{BlockHashOrNumber, ExecutableBlock, PartialHeader}; use katana_primitives::receipt::Receipt; -use katana_primitives::state::StateUpdatesWithDeclaredClasses; use katana_primitives::transaction::{ExecutableTxWithHash, TxWithHash}; +use katana_primitives::version::CURRENT_STARKNET_VERSION; use katana_provider::error::ProviderError; -use katana_provider::traits::block::BlockNumberProvider; +use katana_provider::traits::block::{BlockHashProvider, BlockNumberProvider}; use katana_provider::traits::env::BlockEnvProvider; use katana_provider::traits::state::StateFactoryProvider; +use katana_tasks::{BlockingTaskPool, BlockingTaskResult}; use parking_lot::RwLock; use tokio::time::{interval_at, Instant, Interval}; -use tracing::{trace, warn}; +use tracing::{error, info, trace, warn}; use crate::backend::Backend; @@ -33,68 +28,64 @@ use crate::backend::Backend; pub enum BlockProductionError { #[error(transparent)] Provider(#[from] ProviderError), + + #[error("block mining task cancelled")] + BlockMiningTaskCancelled, + + #[error("transaction execution task cancelled")] + ExecutionTaskCancelled, + + #[error("transaction execution error: {0}")] + TransactionExecutionError(#[from] katana_executor::ExecutorError), } pub struct MinedBlockOutcome { pub block_number: u64, } -type ServiceFuture = Pin + Send + Sync>>; +type ServiceFuture = Pin> + Send + Sync>>; type BlockProductionResult = Result; type BlockProductionFuture = ServiceFuture; + +type TxExecutionResult = Result; +type TxExecutionFuture = ServiceFuture; + type BlockProductionWithTxnsFuture = - ServiceFuture, MinedBlockOutcome), BlockProductionError>>; -pub type TxWithHashAndReceiptPair = (TxWithHash, Receipt); + ServiceFuture>; +pub type TxWithHashAndReceiptPairs = Vec<(TxWithHash, Receipt)>; /// The type which responsible for block production. #[must_use = "BlockProducer does nothing unless polled"] -#[derive(Clone)] -pub struct BlockProducer { +pub struct BlockProducer { /// The inner mode of mining. - pub inner: Arc>, + pub inner: RwLock>, } -impl BlockProducer { +impl BlockProducer { /// Creates a block producer that mines a new block every `interval` milliseconds. - pub fn interval( - backend: Arc, - initial_state: StateRefDb, - interval: u64, - block_exec_envs: (BlockEnv, CfgEnv), - ) -> Self { + pub fn interval(backend: Arc>, interval: u64) -> Self { Self { - inner: Arc::new(RwLock::new(BlockProducerMode::Interval(IntervalBlockProducer::new( - backend, - initial_state, - interval, - block_exec_envs, - )))), + inner: RwLock::new(BlockProducerMode::Interval(IntervalBlockProducer::new( + backend, interval, + ))), } } /// Creates a new block producer that will only be possible to mine by calling the /// `katana_generateBlock` RPC method. - pub fn on_demand( - backend: Arc, - initial_state: StateRefDb, - block_exec_envs: (BlockEnv, CfgEnv), - ) -> Self { + pub fn on_demand(backend: Arc>) -> Self { Self { - inner: Arc::new(RwLock::new(BlockProducerMode::Interval( - IntervalBlockProducer::new_no_mining(backend, initial_state, block_exec_envs), + inner: RwLock::new(BlockProducerMode::Interval(IntervalBlockProducer::new_no_mining( + backend, ))), } } /// Creates a block producer that mines a new block as soon as there are ready transactions in /// the transactions pool. - pub fn instant(backend: Arc) -> Self { - Self { - inner: Arc::new(RwLock::new(BlockProducerMode::Instant(InstantBlockProducer::new( - backend, - )))), - } + pub fn instant(backend: Arc>) -> Self { + Self { inner: RwLock::new(BlockProducerMode::Instant(InstantBlockProducer::new(backend))) } } pub(super) fn queue(&self, transactions: Vec) { @@ -117,18 +108,15 @@ impl BlockProducer { // Handler for the `katana_generateBlock` RPC method. pub fn force_mine(&self) { - trace!(target: "miner", "force mining"); + trace!(target: "miner", "scheduling force block mining"); let mut mode = self.inner.write(); match &mut *mode { BlockProducerMode::Instant(producer) => producer.force_mine(), BlockProducerMode::Interval(producer) => producer.force_mine(), } } -} -impl Stream for BlockProducer { - type Item = BlockProductionResult; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + pub(super) fn poll_next(&self, cx: &mut Context<'_>) -> Poll> { let mut mode = self.inner.write(); match &mut *mode { BlockProducerMode::Instant(producer) => producer.poll_next_unpin(cx), @@ -150,32 +138,37 @@ impl Stream for BlockProducer { /// block producer will execute all the transactions in the mempool and mine a new block with the /// resulting state. The block context is only updated every time a new block is mined as opposed to /// updating it when the block is opened (in _interval_ mode). -pub enum BlockProducerMode { - Interval(IntervalBlockProducer), - Instant(InstantBlockProducer), +pub enum BlockProducerMode { + Interval(IntervalBlockProducer), + Instant(InstantBlockProducer), } -pub struct IntervalBlockProducer { +#[derive(Clone, derive_more::Deref)] +pub struct PendingExecutor(#[deref] Arc>>>); + +impl PendingExecutor { + fn new(executor: Box>) -> Self { + Self(Arc::new(RwLock::new(executor))) + } +} + +pub struct IntervalBlockProducer { /// The interval at which new blocks are mined. interval: Option, - backend: Arc, + backend: Arc>, /// Single active future that mines a new block - block_mining: Option, + ongoing_mining: Option, /// Backlog of sets of transactions ready to be mined queued: VecDeque>, - /// The state of the pending block after executing all the transactions within the interval. - state: Arc, + executor: PendingExecutor, + blocking_task_spawner: BlockingTaskPool, + ongoing_execution: Option, /// Listeners notified when a new executed tx is added. - tx_execution_listeners: RwLock>>>, + tx_execution_listeners: RwLock>>, } -impl IntervalBlockProducer { - pub fn new( - backend: Arc, - db: StateRefDb, - interval: u64, - block_exec_envs: (BlockEnv, CfgEnv), - ) -> Self { +impl IntervalBlockProducer { + pub fn new(backend: Arc>, interval: u64) -> Self { let interval = { let duration = Duration::from_millis(interval); let mut interval = interval_at(Instant::now() + duration, duration); @@ -183,12 +176,24 @@ impl IntervalBlockProducer { interval }; - let state = Arc::new(PendingState::new(db, block_exec_envs.0, block_exec_envs.1)); + let provider = backend.blockchain.provider(); + + let latest_num = provider.latest_number().unwrap(); + let mut block_env = provider.block_env_at(latest_num.into()).unwrap().unwrap(); + backend.update_block_env(&mut block_env); + + let state = provider.latest().unwrap(); + let executor = backend.executor_factory.with_state_and_block_env(state, block_env); + let executor = PendingExecutor::new(executor); + + let blocking_task_spawner = BlockingTaskPool::new().unwrap(); Self { backend, - state, - block_mining: None, + executor, + ongoing_mining: None, + blocking_task_spawner, + ongoing_execution: None, interval: Some(interval), queued: VecDeque::default(), tx_execution_listeners: RwLock::new(vec![]), @@ -198,94 +203,105 @@ impl IntervalBlockProducer { /// Creates a new [IntervalBlockProducer] with no `interval`. This mode will not produce blocks /// for every fixed interval, although it will still execute all queued transactions and /// keep hold of the pending state. - pub fn new_no_mining( - backend: Arc, - db: StateRefDb, - block_exec_envs: (BlockEnv, CfgEnv), - ) -> Self { - let state = Arc::new(PendingState::new(db, block_exec_envs.0, block_exec_envs.1)); + pub fn new_no_mining(backend: Arc>) -> Self { + let provider = backend.blockchain.provider(); + + let latest_num = provider.latest_number().unwrap(); + let mut block_env = provider.block_env_at(latest_num.into()).unwrap().unwrap(); + backend.update_block_env(&mut block_env); + + let state = provider.latest().unwrap(); + let executor = backend.executor_factory.with_state_and_block_env(state, block_env); + let executor = PendingExecutor::new(executor); + + let blocking_task_spawner = BlockingTaskPool::new().unwrap(); Self { - state, backend, + executor, interval: None, - block_mining: None, + ongoing_mining: None, queued: VecDeque::default(), + blocking_task_spawner, + ongoing_execution: None, tx_execution_listeners: RwLock::new(vec![]), } } - pub fn state(&self) -> Arc { - self.state.clone() + pub fn executor(&self) -> PendingExecutor { + self.executor.clone() } /// Force mine a new block. It will only able to mine if there is no ongoing mining process. - pub fn force_mine(&self) { - if self.block_mining.is_none() { - let outcome = self.outcome(); - let _ = Self::do_mine(outcome, self.backend.clone(), self.state.clone()); - } else { - trace!(target: "miner", "unable to force mine while a mining process is running") + pub fn force_mine(&mut self) { + match Self::do_mine(self.executor.clone(), self.backend.clone()) { + Ok(outcome) => { + info!(target: "miner", "force mined block {}", outcome.block_number); + self.executor = + self.create_new_executor_for_next_block().expect("fail to create executor"); + } + Err(e) => { + error!(target: "miner", "failed to force mine: {e}"); + } } } fn do_mine( - state_updates: StateUpdatesWithDeclaredClasses, - backend: Arc, - pending_state: Arc, - ) -> BlockProductionResult { + executor: PendingExecutor, + backend: Arc>, + ) -> Result { + let executor = &mut executor.write(); + trace!(target: "miner", "creating new block"); - let (txs, _) = pending_state.take_txs_all(); - let tx_receipt_pairs = - txs.into_iter().map(|(tx, rct)| (tx, rct.receipt)).collect::>(); + let block_env = executor.block_env(); + let ExecutionOutput { states, transactions } = executor.take_execution_output()?; - let (mut block_env, cfg_env) = pending_state.block_execution_envs(); + let transactions = transactions + .into_iter() + .filter_map(|(tx, rct)| rct.map(|rct| (tx, rct))) + .collect::>(); - let (outcome, new_state) = - backend.mine_pending_block(&block_env, tx_receipt_pairs, state_updates)?; + let outcome = backend.do_mine_block(&block_env, transactions, states)?; trace!(target: "miner", "created new block: {}", outcome.block_number); - backend.update_block_env(&mut block_env); - pending_state.reset_state(new_state, block_env, cfg_env); - Ok(outcome) } - fn execute_transactions(&self, transactions: Vec) { - let txs = transactions.iter().map(TxWithHash::from); - - let block_context = block_context_from_envs( - &self.state.block_envs.read().0, - &self.state.block_envs.read().1, - ); - - let results = { - TransactionExecutor::new( - &self.state.state, - &block_context, - !self.backend.config.disable_fee, - !self.backend.config.disable_validate, - transactions.clone().into_iter(), - ) - .with_error_log() - .with_events_log() - .with_resources_log() - .zip(txs) - .filter_map(|(res, tx)| { - let Ok(info) = res else { return None }; - let receipt = TxReceiptWithExecInfo::new(&tx, info); - Some((tx, receipt)) + fn execute_transactions( + executor: PendingExecutor, + transactions: Vec, + ) -> Result { + let tx_receipt_pair = transactions + .into_iter() + .map(|tx| { + let tx_ = TxWithHash::from(&tx); + let output = executor.write().execute(tx)?; + + let receipt = output.receipt(tx_.as_ref()); + Ok((tx_, receipt)) }) - .collect::>() - }; + .collect::>()?; + + Ok(tx_receipt_pair) + } + + fn create_new_executor_for_next_block(&self) -> Result { + let backend = &self.backend; + let provider = backend.blockchain.provider(); - self.state.executed_txs.write().extend(results.clone()); - self.notify_listener(results.into_iter().map(|(tx, info)| (tx, info.receipt)).collect()); + let latest_num = provider.latest_number()?; + let updated_state = provider.latest()?; + + let mut block_env = provider.block_env_at(latest_num.into())?.unwrap(); + backend.update_block_env(&mut block_env); + + let executor = backend.executor_factory.with_state_and_block_env(updated_state, block_env); + Ok(PendingExecutor::new(executor)) } - pub fn add_listener(&self) -> Receiver> { + pub fn add_listener(&self) -> Receiver { const TX_LISTENER_BUFFER_SIZE: usize = 2048; let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE); self.tx_execution_listeners.write().push(tx); @@ -293,7 +309,7 @@ impl IntervalBlockProducer { } /// notifies all listeners about the transaction - fn notify_listener(&self, txs: Vec) { + fn notify_listener(&self, txs: TxWithHashAndReceiptPairs) { let mut listener = self.tx_execution_listeners.write(); // this is basically a retain but with mut reference for n in (0..listener.len()).rev() { @@ -317,13 +333,9 @@ impl IntervalBlockProducer { } } } - - fn outcome(&self) -> StateUpdatesWithDeclaredClasses { - get_state_update_from_cached_state(&self.state.state) - } } -impl Stream for IntervalBlockProducer { +impl Stream for IntervalBlockProducer { // mined block outcome and the new state type Item = BlockProductionResult; @@ -331,32 +343,82 @@ impl Stream for IntervalBlockProducer { let pin = self.get_mut(); if let Some(interval) = &mut pin.interval { - if interval.poll_tick(cx).is_ready() && pin.block_mining.is_none() { + // mine block if the interval is over + if interval.poll_tick(cx).is_ready() && pin.ongoing_mining.is_none() { + let executor = pin.executor.clone(); let backend = pin.backend.clone(); - let outcome = pin.outcome(); - let state = pin.state.clone(); - - pin.block_mining = Some(Box::pin(async move { - tokio::task::spawn_blocking(|| Self::do_mine(outcome, backend, state)) - .await - .unwrap() - })); + let fut = pin.blocking_task_spawner.spawn(|| Self::do_mine(executor, backend)); + pin.ongoing_mining = Some(Box::pin(fut)); } } - // only execute transactions if there is no mining in progress - if !pin.queued.is_empty() && pin.block_mining.is_none() { - let transactions = pin.queued.pop_front().expect("not empty; qed"); - pin.execute_transactions(transactions); + loop { + if !pin.queued.is_empty() + && pin.ongoing_execution.is_none() + && pin.ongoing_mining.is_none() + { + let executor = pin.executor.clone(); + let transactions: Vec = + std::mem::take(&mut pin.queued).into_iter().flatten().collect(); + + let fut = pin + .blocking_task_spawner + .spawn(|| Self::execute_transactions(executor, transactions)); + + pin.ongoing_execution = Some(Box::pin(fut)); + } + + // poll the ongoing execution if any + if let Some(mut execution) = pin.ongoing_execution.take() { + if let Poll::Ready(executor) = execution.poll_unpin(cx) { + match executor { + Ok(Ok(txs)) => { + pin.notify_listener(txs); + continue; + } + + Ok(Err(e)) => { + return Poll::Ready(Some(Err(e))); + } + + Err(_) => { + return Poll::Ready(Some(Err( + BlockProductionError::ExecutionTaskCancelled, + ))); + } + } + } else { + pin.ongoing_execution = Some(execution); + } + } + + break; } - // poll the mining future - if let Some(mut mining) = pin.block_mining.take() { - // reset the executor for the next block - if let Poll::Ready(outcome) = mining.poll_unpin(cx) { - return Poll::Ready(Some(outcome)); + // poll the mining future if any + if let Some(mut mining) = pin.ongoing_mining.take() { + if let Poll::Ready(res) = mining.poll_unpin(cx) { + match res { + Ok(outcome) => { + match pin.create_new_executor_for_next_block() { + Ok(executor) => { + pin.executor = executor; + } + + Err(e) => return Poll::Ready(Some(Err(e))), + } + + return Poll::Ready(Some(outcome)); + } + + Err(_) => { + return Poll::Ready(Some(Err( + BlockProductionError::BlockMiningTaskCancelled, + ))); + } + } } else { - pin.block_mining = Some(mining) + pin.ongoing_mining = Some(mining); } } @@ -364,23 +426,26 @@ impl Stream for IntervalBlockProducer { } } -pub struct InstantBlockProducer { +pub struct InstantBlockProducer { /// Holds the backend if no block is being mined - backend: Arc, + backend: Arc>, /// Single active future that mines a new block block_mining: Option, /// Backlog of sets of transactions ready to be mined queued: VecDeque>, + + blocking_task_pool: BlockingTaskPool, /// Listeners notified when a new executed tx is added. - tx_execution_listeners: RwLock>>>, + tx_execution_listeners: RwLock>>, } -impl InstantBlockProducer { - pub fn new(backend: Arc) -> Self { +impl InstantBlockProducer { + pub fn new(backend: Arc>) -> Self { Self { backend, block_mining: None, queued: VecDeque::default(), + blocking_task_pool: BlockingTaskPool::new().unwrap(), tx_execution_listeners: RwLock::new(vec![]), } } @@ -395,58 +460,50 @@ impl InstantBlockProducer { } fn do_mine( - backend: Arc, + backend: Arc>, transactions: Vec, - ) -> Result<(Vec, MinedBlockOutcome), BlockProductionError> { + ) -> Result<(MinedBlockOutcome, TxWithHashAndReceiptPairs), BlockProductionError> { trace!(target: "miner", "creating new block"); let provider = backend.blockchain.provider(); - let cfg_env = backend.chain_cfg_env(); let latest_num = provider.latest_number()?; let mut block_env = provider.block_env_at(BlockHashOrNumber::Num(latest_num))?.unwrap(); backend.update_block_env(&mut block_env); - let block_context = block_context_from_envs(&block_env, &cfg_env); - - let latest_state = StateFactoryProvider::latest(backend.blockchain.provider())?; - let state = CachedStateWrapper::new(StateRefDb(latest_state)); - - let txs = transactions.iter().map(TxWithHash::from); - - let tx_receipt_pairs: Vec = TransactionExecutor::new( - &state, - &block_context, - !backend.config.disable_fee, - !backend.config.disable_validate, - transactions.clone().into_iter(), - ) - .with_error_log() - .with_events_log() - .with_resources_log() - .zip(txs) - .filter_map(|(res, tx)| { - if let Ok(info) = res { - let info = TxReceiptWithExecInfo::new(&tx, info); - Some((tx, info.receipt)) - } else { - None - } - }) - .collect(); + let parent_hash = provider.latest_hash()?; + let latest_state = provider.latest()?; + + let mut executor = backend.executor_factory.with_state(latest_state); + + let block = ExecutableBlock { + body: transactions, + header: PartialHeader { + parent_hash, + number: block_env.number, + timestamp: block_env.timestamp, + gas_prices: block_env.l1_gas_prices.clone(), + sequencer_address: block_env.sequencer_address, + version: CURRENT_STARKNET_VERSION, + }, + }; - let outcome = backend.do_mine_block( - &block_env, - tx_receipt_pairs.clone(), - get_state_update_from_cached_state(&state), - )?; + executor.execute_block(block)?; + + let ExecutionOutput { states, transactions } = executor.take_execution_output().unwrap(); + let tx_receipt_pairs = transactions + .into_iter() + .filter_map(|(tx, rct)| rct.map(|rct| (tx, rct))) + .collect::>(); + + let outcome = backend.do_mine_block(&block_env, tx_receipt_pairs.clone(), states)?; trace!(target: "miner", "created new block: {}", outcome.block_number); - Ok((tx_receipt_pairs, outcome)) + Ok((outcome, tx_receipt_pairs)) } - pub fn add_listener(&self) -> Receiver> { + pub fn add_listener(&self) -> Receiver { const TX_LISTENER_BUFFER_SIZE: usize = 2048; let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE); self.tx_execution_listeners.write().push(tx); @@ -454,7 +511,7 @@ impl InstantBlockProducer { } /// notifies all listeners about the transaction - fn notify_listener(&self, txs: Vec) { + fn notify_listener(&self, txs: TxWithHashAndReceiptPairs) { let mut listener = self.tx_execution_listeners.write(); // this is basically a retain but with mut reference for n in (0..listener.len()).rev() { @@ -480,7 +537,7 @@ impl InstantBlockProducer { } } -impl Stream for InstantBlockProducer { +impl Stream for InstantBlockProducer { // mined block outcome and the new state type Item = Result; @@ -491,26 +548,32 @@ impl Stream for InstantBlockProducer { let transactions = pin.queued.pop_front().expect("not empty; qed"); let backend = pin.backend.clone(); - pin.block_mining = Some(Box::pin(async move { - tokio::task::spawn_blocking(|| Self::do_mine(backend, transactions)).await.unwrap() - })); + pin.block_mining = Some(Box::pin( + pin.blocking_task_pool.spawn(|| Self::do_mine(backend, transactions)), + )); } // poll the mining future if let Some(mut mining) = pin.block_mining.take() { - match mining.poll_unpin(cx) { - Poll::Ready(Ok((txs, outcome))) => { - pin.notify_listener(txs); - return Poll::Ready(Some(Ok(outcome))); - } + if let Poll::Ready(outcome) = mining.poll_unpin(cx) { + match outcome { + Ok(Ok((outcome, txs))) => { + pin.notify_listener(txs); + return Poll::Ready(Some(Ok(outcome))); + } - Poll::Ready(Err(e)) => { - return Poll::Ready(Some(Err(e))); - } + Ok(Err(e)) => { + return Poll::Ready(Some(Err(e))); + } - Poll::Pending => { - pin.block_mining = Some(mining); + Err(_) => { + return Poll::Ready(Some(Err( + BlockProductionError::ExecutionTaskCancelled, + ))); + } } + } else { + pin.block_mining = Some(mining) } } diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index 379a866dce..ea0d124904 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -4,6 +4,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use futures::{Future, FutureExt, Stream}; +use katana_executor::ExecutorFactory; use katana_primitives::block::BlockHashOrNumber; use katana_primitives::receipt::MessageToL1; use katana_primitives::transaction::{ExecutableTxWithHash, L1HandlerTx, TxHash}; @@ -20,10 +21,10 @@ type MessagingFuture = Pin + Send>>; type MessageGatheringFuture = MessagingFuture>; type MessageSettlingFuture = MessagingFuture>>; -pub struct MessagingService { +pub struct MessagingService { /// The interval at which the service will perform the messaging operations. interval: Interval, - backend: Arc, + backend: Arc>, pool: Arc, /// The messenger mode the service is running in. messenger: Arc, @@ -37,13 +38,13 @@ pub struct MessagingService { msg_send_fut: Option, } -impl MessagingService { +impl MessagingService { /// Initializes a new instance from a configuration file's path. /// Will panic on failure to avoid continuing with invalid configuration. pub async fn new( config: MessagingConfig, pool: Arc, - backend: Arc, + backend: Arc>, ) -> anyhow::Result { let gather_from_block = config.from_block; let interval = interval_from_seconds(config.interval); @@ -72,7 +73,7 @@ impl MessagingService { async fn gather_messages( messenger: Arc, pool: Arc, - backend: Arc, + backend: Arc>, from_block: u64, ) -> MessengerResult<(u64, usize)> { // 200 avoids any possible rejection from RPC with possibly lot's of messages. @@ -113,7 +114,7 @@ impl MessagingService { async fn send_messages( block_num: u64, - backend: Arc, + backend: Arc>, messenger: Arc, ) -> MessengerResult> { let Some(messages) = ReceiptProvider::receipts_by_block( @@ -167,7 +168,7 @@ pub enum MessagingOutcome { }, } -impl Stream for MessagingService { +impl Stream for MessagingService { type Item = MessagingOutcome; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/crates/katana/core/src/service/mod.rs b/crates/katana/core/src/service/mod.rs index 3b803f83da..899e15bd6b 100644 --- a/crates/katana/core/src/service/mod.rs +++ b/crates/katana/core/src/service/mod.rs @@ -7,9 +7,10 @@ use std::task::{Context, Poll}; use futures::channel::mpsc::Receiver; use futures::stream::{Fuse, Stream, StreamExt}; +use katana_executor::ExecutorFactory; use katana_primitives::transaction::ExecutableTxWithHash; use starknet::core::types::FieldElement; -use tracing::{error, trace}; +use tracing::info; use self::block_producer::BlockProducer; use crate::pool::TransactionPool; @@ -26,19 +27,19 @@ use self::messaging::{MessagingOutcome, MessagingService}; /// This service is basically an endless future that continuously polls the miner which returns /// transactions for the next block, then those transactions are handed off to the [BlockProducer] /// to construct a new block. -pub struct NodeService { +pub struct NodeService { /// the pool that holds all transactions pub(crate) pool: Arc, /// creates new blocks - pub(crate) block_producer: BlockProducer, + pub(crate) block_producer: Arc>, /// the miner responsible to select transactions from the `pool´ pub(crate) miner: TransactionMiner, /// The messaging service #[cfg(feature = "messaging")] - pub(crate) messaging: Option, + pub(crate) messaging: Option>, } -impl Future for NodeService { +impl Future for NodeService { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -49,10 +50,10 @@ impl Future for NodeService { while let Poll::Ready(Some(outcome)) = messaging.poll_next_unpin(cx) { match outcome { MessagingOutcome::Gather { msg_count, .. } => { - trace!(target: "node", "collected {msg_count} messages from settlement chain"); + info!(target: "node", "collected {msg_count} messages from settlement chain"); } MessagingOutcome::Send { msg_count, .. } => { - trace!(target: "node", "sent {msg_count} messages to the settlement chain"); + info!(target: "node", "sent {msg_count} messages to the settlement chain"); } } } @@ -61,14 +62,14 @@ impl Future for NodeService { // this drives block production and feeds new sets of ready transactions to the block // producer loop { - while let Poll::Ready(Some(res)) = pin.block_producer.poll_next_unpin(cx) { + while let Poll::Ready(Some(res)) = pin.block_producer.poll_next(cx) { match res { Ok(outcome) => { - trace!(target: "node", "mined block {}", outcome.block_number) + info!(target: "node", "mined block {}", outcome.block_number) } Err(err) => { - error!(target: "node", "failed to mine block: {err}"); + info!(target: "node", "failed to mine block: {err}"); } } } diff --git a/crates/katana/core/tests/sequencer.rs b/crates/katana/core/tests/sequencer.rs index d4d2225e82..8fa521afb2 100644 --- a/crates/katana/core/tests/sequencer.rs +++ b/crates/katana/core/tests/sequencer.rs @@ -1,6 +1,7 @@ use ethers::types::U256; use katana_core::backend::config::{Environment, StarknetConfig}; use katana_core::sequencer::{KatanaSequencer, SequencerConfig}; +use katana_executor::implementation::noop::NoopExecutorFactory; use katana_primitives::genesis::allocation::DevAllocationsGenerator; use katana_primitives::genesis::constant::DEFAULT_PREFUNDED_ACCOUNT_BALANCE; use katana_primitives::genesis::Genesis; @@ -26,9 +27,10 @@ fn create_test_sequencer_config() -> (SequencerConfig, StarknetConfig) { ) } -async fn create_test_sequencer() -> KatanaSequencer { +async fn create_test_sequencer() -> KatanaSequencer { + let executor_factory = NoopExecutorFactory::new(); let (sequencer_config, starknet_config) = create_test_sequencer_config(); - KatanaSequencer::new(sequencer_config, starknet_config).await.unwrap() + KatanaSequencer::new(executor_factory, sequencer_config, starknet_config).await.unwrap() } #[tokio::test] diff --git a/crates/katana/executor/Cargo.toml b/crates/katana/executor/Cargo.toml index 9712f80f9f..87bbcc529c 100644 --- a/crates/katana/executor/Cargo.toml +++ b/crates/katana/executor/Cargo.toml @@ -1,5 +1,5 @@ [package] -description = "Katana execution engine. This crate provides implementations for executing transactions." +description = "Katana execution engine. This crate provides abstractions and implementations for transaction execution." edition.workspace = true name = "katana-executor" version.workspace = true @@ -10,18 +10,17 @@ version.workspace = true katana-primitives.workspace = true katana-provider.workspace = true -anyhow.workspace = true convert_case.workspace = true futures.workspace = true parking_lot.workspace = true +serde_json.workspace = true starknet.workspace = true +starknet_api.workspace = true thiserror.workspace = true tracing.workspace = true # blockifier deps -blockifier.workspace = true -starknet_api.workspace = true -tokio.workspace = true +blockifier = { workspace = true, optional = true } [dev-dependencies] cairo-vm.workspace = true @@ -29,3 +28,8 @@ katana-provider.workspace = true rstest.workspace = true serde_json.workspace = true similar-asserts.workspace = true + +[features] +default = [ "blockifier" ] + +blockifier = [ "dep:blockifier", "katana-primitives/blockifier" ] diff --git a/crates/katana/executor/src/blockifier/mod.rs b/crates/katana/executor/src/blockifier/mod.rs deleted file mode 100644 index 725551c4bd..0000000000 --- a/crates/katana/executor/src/blockifier/mod.rs +++ /dev/null @@ -1,231 +0,0 @@ -pub mod outcome; -pub mod state; -pub mod transactions; -pub mod utils; - -use std::sync::Arc; - -use blockifier::block_context::BlockContext; -use blockifier::transaction::errors::TransactionExecutionError; -use blockifier::transaction::objects::TransactionExecutionInfo; -use blockifier::transaction::transaction_execution::Transaction; -use blockifier::transaction::transactions::ExecutableTransaction; -use katana_primitives::env::{BlockEnv, CfgEnv}; -use katana_primitives::transaction::{ExecutableTx, ExecutableTxWithHash, TxWithHash}; -use katana_provider::traits::state::StateProvider; -use parking_lot::RwLock; -use tracing::{trace, warn}; - -use self::outcome::TxReceiptWithExecInfo; -use self::state::{CachedStateWrapper, StateRefDb}; -use self::transactions::BlockifierTx; -use self::utils::events_from_exec_info; -use crate::blockifier::utils::{ - pretty_print_resources, trace_events, warn_message_transaction_error_exec_error, -}; - -/// The result of a transaction execution. -type TxExecutionResult = Result; - -/// A transaction executor. -/// -/// The transactions will be executed in an iterator fashion, sequentially, in the -/// exact order they are provided to the executor. The execution is done within its -/// implementation of the [`Iterator`] trait. -pub struct TransactionExecutor<'a, T> { - /// A flag to enable/disable fee charging. - charge_fee: bool, - /// The block context the transactions will be executed on. - block_context: &'a BlockContext, - /// The transactions to be executed (in the exact order they are in the iterator). - transactions: T, - /// The state the transactions will be executed on. - state: &'a CachedStateWrapper, - /// A flag to enable/disable transaction validation. - validate: bool, - - // logs flags - error_log: bool, - events_log: bool, - resources_log: bool, -} - -impl<'a, T> TransactionExecutor<'a, T> -where - T: Iterator, -{ - pub fn new( - state: &'a CachedStateWrapper, - block_context: &'a BlockContext, - charge_fee: bool, - validate: bool, - transactions: T, - ) -> Self { - Self { - state, - charge_fee, - transactions, - block_context, - validate, - error_log: false, - events_log: false, - resources_log: false, - } - } - - pub fn with_events_log(self) -> Self { - Self { events_log: true, ..self } - } - - pub fn with_error_log(self) -> Self { - Self { error_log: true, ..self } - } - - pub fn with_resources_log(self) -> Self { - Self { resources_log: true, ..self } - } - - /// A method to conveniently execute all the transactions and return their results. - pub fn execute(self) -> Vec { - self.collect() - } -} - -impl<'a, T> Iterator for TransactionExecutor<'a, T> -where - T: Iterator, -{ - type Item = TxExecutionResult; - - fn next(&mut self) -> Option { - let res = self.transactions.next().map(|tx| { - execute_tx(tx, self.state, self.block_context, self.charge_fee, self.validate) - })?; - - match res { - Ok(ref info) => { - if self.error_log { - if let Some(err) = &info.revert_error { - let formatted_err = format!("{err:?}").replace("\\n", "\n"); - warn!(target: "executor", "Transaction execution error: {formatted_err}"); - } - } - - if self.resources_log { - trace!( - target: "executor", - "Transaction resource usage: {}", - pretty_print_resources(&info.actual_resources) - ); - } - - if self.events_log { - trace_events(&events_from_exec_info(info)); - } - - Some(res) - } - - Err(ref err) => { - if self.error_log { - warn_message_transaction_error_exec_error(err); - } - - Some(res) - } - } - } -} - -fn execute_tx( - tx: ExecutableTxWithHash, - state: &CachedStateWrapper, - block_context: &BlockContext, - charge_fee: bool, - validate: bool, -) -> TxExecutionResult { - let class_declaration_params = if let ExecutableTx::Declare(tx) = tx.as_ref() { - let class_hash = tx.class_hash(); - Some((class_hash, tx.compiled_class.clone(), tx.sierra_class.clone())) - } else { - None - }; - - let res = match BlockifierTx::from(tx).0 { - Transaction::AccountTransaction(tx) => { - tx.execute(&mut state.inner(), block_context, charge_fee, validate) - } - Transaction::L1HandlerTransaction(tx) => { - tx.execute(&mut state.inner(), block_context, charge_fee, validate) - } - }; - - if res.is_ok() { - if let Some((class_hash, compiled_class, sierra_class)) = class_declaration_params { - state.class_cache.write().compiled.insert(class_hash, compiled_class); - - if let Some(sierra_class) = sierra_class { - state.class_cache.write().sierra.insert(class_hash, sierra_class); - } - } - } - - res -} - -pub type AcceptedTxPair = (TxWithHash, TxReceiptWithExecInfo); -pub type RejectedTxPair = (TxWithHash, TransactionExecutionError); - -pub struct PendingState { - /// The block context of the pending block. - pub block_envs: RwLock<(BlockEnv, CfgEnv)>, - /// The state of the pending block. - pub state: Arc, - /// The transactions that have been executed. - pub executed_txs: RwLock>, - /// The transactions that have been rejected. - pub rejected_txs: RwLock>, -} - -impl PendingState { - pub fn new(state: StateRefDb, block_env: BlockEnv, cfg_env: CfgEnv) -> Self { - Self { - block_envs: RwLock::new((block_env, cfg_env)), - state: Arc::new(CachedStateWrapper::new(state)), - executed_txs: RwLock::new(Vec::new()), - rejected_txs: RwLock::new(Vec::new()), - } - } - - pub fn reset_state(&self, state: Box, block_env: BlockEnv, cfg_env: CfgEnv) { - *self.block_envs.write() = (block_env, cfg_env); - self.state.reset_with_new_state(StateRefDb(state)); - } - - pub fn add_executed_txs(&self, transactions: Vec<(TxWithHash, TxExecutionResult)>) { - transactions.into_iter().for_each(|(tx, res)| self.add_executed_tx(tx, res)); - } - - /// Drain the pending transactions, returning the executed and rejected transactions. - pub fn take_txs_all(&self) -> (Vec, Vec) { - let executed_txs = std::mem::take(&mut *self.executed_txs.write()); - let rejected_txs = std::mem::take(&mut *self.rejected_txs.write()); - (executed_txs, rejected_txs) - } - - pub fn block_execution_envs(&self) -> (BlockEnv, CfgEnv) { - self.block_envs.read().clone() - } - - fn add_executed_tx(&self, tx: TxWithHash, execution_result: TxExecutionResult) { - match execution_result { - Ok(execution_info) => { - let receipt = TxReceiptWithExecInfo::new(&tx, execution_info); - self.executed_txs.write().push((tx, receipt)); - } - Err(err) => { - self.rejected_txs.write().push((tx, err)); - } - } - } -} diff --git a/crates/katana/executor/src/blockifier/outcome.rs b/crates/katana/executor/src/blockifier/outcome.rs deleted file mode 100644 index 72584dc7c9..0000000000 --- a/crates/katana/executor/src/blockifier/outcome.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::collections::HashMap; - -use blockifier::transaction::objects::TransactionExecutionInfo; -use katana_primitives::receipt::{ - DeclareTxReceipt, DeployAccountTxReceipt, InvokeTxReceipt, L1HandlerTxReceipt, Receipt, - TxExecutionResources, -}; -use katana_primitives::transaction::Tx; - -use super::utils::{events_from_exec_info, l2_to_l1_messages_from_exec_info}; - -#[derive(Clone)] -pub struct TxReceiptWithExecInfo { - pub receipt: Receipt, - pub execution_info: TransactionExecutionInfo, -} - -impl TxReceiptWithExecInfo { - pub fn new(tx: impl AsRef, execution_info: TransactionExecutionInfo) -> Self { - let actual_fee = execution_info.actual_fee.0; - - let events = events_from_exec_info(&execution_info); - let revert_error = execution_info.revert_error.clone(); - let messages_sent = l2_to_l1_messages_from_exec_info(&execution_info); - let actual_resources = parse_actual_resources(&execution_info.actual_resources.0); - - let receipt = match tx.as_ref() { - Tx::Invoke(_) => Receipt::Invoke(InvokeTxReceipt { - events, - actual_fee, - revert_error, - messages_sent, - execution_resources: actual_resources, - }), - - Tx::Declare(_) => Receipt::Declare(DeclareTxReceipt { - events, - actual_fee, - revert_error, - messages_sent, - execution_resources: actual_resources, - }), - - Tx::L1Handler(tx) => Receipt::L1Handler(L1HandlerTxReceipt { - events, - actual_fee, - revert_error, - messages_sent, - message_hash: tx.message_hash, - execution_resources: actual_resources, - }), - - Tx::DeployAccount(tx) => Receipt::DeployAccount(DeployAccountTxReceipt { - events, - actual_fee, - revert_error, - messages_sent, - execution_resources: actual_resources, - contract_address: tx.contract_address(), - }), - }; - - Self { receipt, execution_info } - } -} - -/// Parse the `actual resources` field from the execution info into a more structured type, -/// [`TxExecutionResources`]. -fn parse_actual_resources(resources: &HashMap) -> TxExecutionResources { - TxExecutionResources { - steps: resources.get("n_steps").copied().unwrap_or_default() as u64, - memory_holes: resources.get("memory_holes").map(|x| *x as u64), - ec_op_builtin: resources.get("ec_op_builtin").map(|x| *x as u64), - ecdsa_builtin: resources.get("ecdsa_builtin").map(|x| *x as u64), - keccak_builtin: resources.get("keccak_builtin").map(|x| *x as u64), - bitwise_builtin: resources.get("bitwise_builtin").map(|x| *x as u64), - pedersen_builtin: resources.get("pedersen_builtin").map(|x| *x as u64), - poseidon_builtin: resources.get("poseidon_builtin").map(|x| *x as u64), - range_check_builtin: resources.get("range_check_builtin").map(|x| *x as u64), - segment_arena_builtin: resources.get("segment_arena_builtin").map(|x| *x as u64), - } -} diff --git a/crates/katana/executor/src/blockifier/state.rs b/crates/katana/executor/src/blockifier/state.rs deleted file mode 100644 index 0e528b0874..0000000000 --- a/crates/katana/executor/src/blockifier/state.rs +++ /dev/null @@ -1,234 +0,0 @@ -use std::collections::HashMap; - -use blockifier::state::cached_state::{CachedState, GlobalContractCache}; -use blockifier::state::errors::StateError; -use blockifier::state::state_api::{StateReader, StateResult}; -use katana_primitives::class::{CompiledClass, FlattenedSierraClass}; -use katana_primitives::conversion::blockifier::to_class; -use katana_primitives::FieldElement; -use katana_provider::traits::contract::ContractClassProvider; -use katana_provider::traits::state::StateProvider; -use katana_provider::ProviderResult; -use parking_lot::{Mutex, RawMutex, RwLock}; -use starknet_api::core::{ClassHash, CompiledClassHash, Nonce, PatriciaKey}; -use starknet_api::hash::StarkHash; -use starknet_api::patricia_key; -use starknet_api::state::StorageKey; - -mod primitives { - pub use katana_primitives::class::{ClassHash, CompiledClassHash}; - pub use katana_primitives::contract::{ContractAddress, Nonce, StorageKey, StorageValue}; -} - -/// A state db only provide read access. -/// -/// This type implements the [`StateReader`] trait so that it can be used as a with [`CachedState`]. -pub struct StateRefDb(pub Box); - -impl StateRefDb { - pub fn new(provider: impl StateProvider + 'static) -> Self { - Self(Box::new(provider)) - } -} - -impl StateProvider for StateRefDb { - fn class_hash_of_contract( - &self, - address: primitives::ContractAddress, - ) -> ProviderResult> { - self.0.class_hash_of_contract(address) - } - - fn nonce( - &self, - address: primitives::ContractAddress, - ) -> ProviderResult> { - self.0.nonce(address) - } - - fn storage( - &self, - address: primitives::ContractAddress, - storage_key: primitives::StorageKey, - ) -> ProviderResult> { - self.0.storage(address, storage_key) - } -} - -impl ContractClassProvider for StateRefDb { - fn class(&self, hash: primitives::ClassHash) -> ProviderResult> { - self.0.class(hash) - } - - fn compiled_class_hash_of_class_hash( - &self, - hash: primitives::ClassHash, - ) -> ProviderResult> { - self.0.compiled_class_hash_of_class_hash(hash) - } - - fn sierra_class( - &self, - hash: primitives::ClassHash, - ) -> ProviderResult> { - self.0.sierra_class(hash) - } -} - -impl StateReader for StateRefDb { - fn get_nonce_at( - &mut self, - contract_address: starknet_api::core::ContractAddress, - ) -> StateResult { - StateProvider::nonce(&self.0, contract_address.into()) - .map(|n| Nonce(n.unwrap_or_default().into())) - .map_err(|e| StateError::StateReadError(e.to_string())) - } - - fn get_storage_at( - &mut self, - contract_address: starknet_api::core::ContractAddress, - key: starknet_api::state::StorageKey, - ) -> StateResult { - StateProvider::storage(&self.0, contract_address.into(), (*key.0.key()).into()) - .map(|v| v.unwrap_or_default().into()) - .map_err(|e| StateError::StateReadError(e.to_string())) - } - - fn get_class_hash_at( - &mut self, - contract_address: starknet_api::core::ContractAddress, - ) -> StateResult { - StateProvider::class_hash_of_contract(&self.0, contract_address.into()) - .map(|v| ClassHash(v.unwrap_or_default().into())) - .map_err(|e| StateError::StateReadError(e.to_string())) - } - - fn get_compiled_class_hash( - &mut self, - class_hash: starknet_api::core::ClassHash, - ) -> StateResult { - if let Some(hash) = - ContractClassProvider::compiled_class_hash_of_class_hash(&self.0, class_hash.0.into()) - .map_err(|e| StateError::StateReadError(e.to_string()))? - { - Ok(CompiledClassHash(hash.into())) - } else { - Err(StateError::UndeclaredClassHash(class_hash)) - } - } - - fn get_compiled_contract_class( - &mut self, - class_hash: starknet_api::core::ClassHash, - ) -> StateResult { - if let Some(class) = ContractClassProvider::class(&self.0, class_hash.0.into()) - .map_err(|e| StateError::StateReadError(e.to_string()))? - { - to_class(class).map_err(|e| StateError::StateReadError(e.to_string())) - } else { - Err(StateError::UndeclaredClassHash(class_hash)) - } - } -} - -#[derive(Default)] -pub struct ClassCache { - pub(crate) compiled: HashMap, - pub(crate) sierra: HashMap, -} - -pub struct CachedStateWrapper { - inner: Mutex>, - pub(crate) class_cache: RwLock, -} - -impl CachedStateWrapper { - pub fn new(db: StateRefDb) -> Self { - Self { - class_cache: RwLock::new(ClassCache::default()), - inner: Mutex::new(CachedState::new(db, GlobalContractCache::default())), - } - } - - pub(super) fn reset_with_new_state(&self, db: StateRefDb) { - *self.inner() = CachedState::new(db, GlobalContractCache::default()); - let mut lock = self.class_cache.write(); - lock.compiled.clear(); - lock.sierra.clear(); - } - - pub fn inner( - &self, - ) -> parking_lot::lock_api::MutexGuard<'_, RawMutex, CachedState> { - self.inner.lock() - } -} - -impl ContractClassProvider for CachedStateWrapper { - fn class(&self, hash: primitives::ClassHash) -> ProviderResult> { - if let res @ Some(_) = self.class_cache.read().compiled.get(&hash).cloned() { - Ok(res) - } else { - self.inner().state.class(hash) - } - } - - fn compiled_class_hash_of_class_hash( - &self, - hash: primitives::ClassHash, - ) -> ProviderResult> { - let Ok(hash) = self.inner().get_compiled_class_hash(ClassHash(hash.into())) else { - return Ok(None); - }; - Ok(Some(hash.0.into())) - } - - fn sierra_class( - &self, - hash: primitives::ClassHash, - ) -> ProviderResult> { - if let Some(class) = self.class_cache.read().sierra.get(&hash) { - Ok(Some(class.clone())) - } else { - self.inner.lock().state.0.sierra_class(hash) - } - } -} - -impl StateProvider for CachedStateWrapper { - fn storage( - &self, - address: primitives::ContractAddress, - storage_key: primitives::StorageKey, - ) -> ProviderResult> { - let Ok(value) = - self.inner().get_storage_at(address.into(), StorageKey(patricia_key!(storage_key))) - else { - return Ok(None); - }; - Ok(Some(value.into())) - } - - fn nonce( - &self, - address: primitives::ContractAddress, - ) -> ProviderResult> { - let Ok(nonce) = self.inner().get_nonce_at(address.into()) else { - return Ok(None); - }; - Ok(Some(nonce.0.into())) - } - - fn class_hash_of_contract( - &self, - address: primitives::ContractAddress, - ) -> ProviderResult> { - let Ok(hash) = self.inner().get_class_hash_at(address.into()) else { - return Ok(None); - }; - - let hash = hash.0.into(); - if hash == FieldElement::ZERO { Ok(None) } else { Ok(Some(hash)) } - } -} diff --git a/crates/katana/executor/src/blockifier/transactions.rs b/crates/katana/executor/src/blockifier/transactions.rs deleted file mode 100644 index 53c3ea2f59..0000000000 --- a/crates/katana/executor/src/blockifier/transactions.rs +++ /dev/null @@ -1,246 +0,0 @@ -use std::collections::BTreeMap; -use std::sync::Arc; - -use ::blockifier::transaction::transaction_execution::Transaction; -use ::blockifier::transaction::transactions::{DeployAccountTransaction, InvokeTransaction}; -use blockifier::transaction::account_transaction::AccountTransaction; -use blockifier::transaction::transactions::{DeclareTransaction, L1HandlerTransaction}; -use katana_primitives::conversion::blockifier::to_class; -use katana_primitives::transaction::{ - DeclareTx, DeployAccountTx, ExecutableTx, ExecutableTxWithHash, InvokeTx, -}; -use starknet_api::core::{ClassHash, CompiledClassHash, EntryPointSelector, Nonce}; -use starknet_api::data_availability::DataAvailabilityMode; -use starknet_api::transaction::{ - AccountDeploymentData, Calldata, ContractAddressSalt, - DeclareTransaction as ApiDeclareTransaction, DeclareTransactionV0V1, DeclareTransactionV2, - DeclareTransactionV3, DeployAccountTransaction as ApiDeployAccountTransaction, - DeployAccountTransactionV1, DeployAccountTransactionV3, Fee, - InvokeTransaction as ApiInvokeTransaction, PaymasterData, Resource, ResourceBounds, - ResourceBoundsMapping, Tip, TransactionHash, TransactionSignature, TransactionVersion, -}; - -/// A newtype wrapper for execution transaction used in `blockifier`. -pub struct BlockifierTx(pub(super) ::blockifier::transaction::transaction_execution::Transaction); - -impl From for BlockifierTx { - fn from(value: ExecutableTxWithHash) -> Self { - let hash = value.hash; - - let tx = match value.transaction { - ExecutableTx::Invoke(tx) => match tx { - InvokeTx::V1(tx) => { - let calldata = tx.calldata.into_iter().map(|f| f.into()).collect(); - let signature = tx.signature.into_iter().map(|f| f.into()).collect(); - - Transaction::AccountTransaction(AccountTransaction::Invoke(InvokeTransaction { - tx: ApiInvokeTransaction::V1( - starknet_api::transaction::InvokeTransactionV1 { - max_fee: Fee(tx.max_fee), - nonce: Nonce(tx.nonce.into()), - sender_address: tx.sender_address.into(), - signature: TransactionSignature(signature), - calldata: Calldata(Arc::new(calldata)), - }, - ), - tx_hash: TransactionHash(hash.into()), - only_query: false, - })) - } - - InvokeTx::V3(tx) => { - let calldata = tx.calldata.into_iter().map(|f| f.into()).collect(); - let signature = tx.signature.into_iter().map(|f| f.into()).collect(); - - let paymaster_data = tx.paymaster_data.into_iter().map(|f| f.into()).collect(); - let account_deploy_data = - tx.account_deployment_data.into_iter().map(|f| f.into()).collect(); - let fee_data_availability_mode = to_api_da_mode(tx.fee_data_availability_mode); - let nonce_data_availability_mode = - to_api_da_mode(tx.nonce_data_availability_mode); - - Transaction::AccountTransaction(AccountTransaction::Invoke(InvokeTransaction { - tx: ApiInvokeTransaction::V3( - starknet_api::transaction::InvokeTransactionV3 { - tip: Tip(tx.tip), - nonce: Nonce(tx.nonce.into()), - sender_address: tx.sender_address.into(), - signature: TransactionSignature(signature), - calldata: Calldata(Arc::new(calldata)), - paymaster_data: PaymasterData(paymaster_data), - account_deployment_data: AccountDeploymentData(account_deploy_data), - fee_data_availability_mode, - nonce_data_availability_mode, - resource_bounds: to_api_resource_bounds(tx.resource_bounds), - }, - ), - tx_hash: TransactionHash(hash.into()), - only_query: false, - })) - } - }, - - ExecutableTx::DeployAccount(tx) => match tx { - DeployAccountTx::V1(tx) => { - let calldata = tx.constructor_calldata.into_iter().map(|f| f.into()).collect(); - let signature = tx.signature.into_iter().map(|f| f.into()).collect(); - let salt = ContractAddressSalt(tx.contract_address_salt.into()); - - Transaction::AccountTransaction(AccountTransaction::DeployAccount( - DeployAccountTransaction { - contract_address: tx.contract_address.into(), - tx: ApiDeployAccountTransaction::V1(DeployAccountTransactionV1 { - max_fee: Fee(tx.max_fee), - nonce: Nonce(tx.nonce.into()), - signature: TransactionSignature(signature), - class_hash: ClassHash(tx.class_hash.into()), - constructor_calldata: Calldata(Arc::new(calldata)), - contract_address_salt: salt, - }), - tx_hash: TransactionHash(hash.into()), - only_query: false, - }, - )) - } - - DeployAccountTx::V3(tx) => { - let calldata = tx.constructor_calldata.into_iter().map(|f| f.into()).collect(); - let signature = tx.signature.into_iter().map(|f| f.into()).collect(); - let salt = ContractAddressSalt(tx.contract_address_salt.into()); - - let paymaster_data = tx.paymaster_data.into_iter().map(|f| f.into()).collect(); - let fee_data_availability_mode = to_api_da_mode(tx.fee_data_availability_mode); - let nonce_data_availability_mode = - to_api_da_mode(tx.nonce_data_availability_mode); - - Transaction::AccountTransaction(AccountTransaction::DeployAccount( - DeployAccountTransaction { - contract_address: tx.contract_address.into(), - tx: ApiDeployAccountTransaction::V3(DeployAccountTransactionV3 { - tip: Tip(tx.tip), - nonce: Nonce(tx.nonce.into()), - signature: TransactionSignature(signature), - class_hash: ClassHash(tx.class_hash.into()), - constructor_calldata: Calldata(Arc::new(calldata)), - contract_address_salt: salt, - paymaster_data: PaymasterData(paymaster_data), - fee_data_availability_mode, - nonce_data_availability_mode, - resource_bounds: to_api_resource_bounds(tx.resource_bounds), - }), - tx_hash: TransactionHash(hash.into()), - only_query: false, - }, - )) - } - }, - - ExecutableTx::Declare(tx) => { - let contract_class = tx.compiled_class; - - let tx = match tx.transaction { - DeclareTx::V1(tx) => { - let signature = tx.signature.into_iter().map(|f| f.into()).collect(); - - ApiDeclareTransaction::V1(DeclareTransactionV0V1 { - max_fee: Fee(tx.max_fee), - nonce: Nonce(tx.nonce.into()), - sender_address: tx.sender_address.into(), - signature: TransactionSignature(signature), - class_hash: ClassHash(tx.class_hash.into()), - }) - } - - DeclareTx::V2(tx) => { - let signature = tx.signature.into_iter().map(|f| f.into()).collect(); - - ApiDeclareTransaction::V2(DeclareTransactionV2 { - max_fee: Fee(tx.max_fee), - nonce: Nonce(tx.nonce.into()), - sender_address: tx.sender_address.into(), - signature: TransactionSignature(signature), - class_hash: ClassHash(tx.class_hash.into()), - compiled_class_hash: CompiledClassHash(tx.compiled_class_hash.into()), - }) - } - - DeclareTx::V3(tx) => { - let signature = tx.signature.into_iter().map(|f| f.into()).collect(); - - let paymaster_data = - tx.paymaster_data.into_iter().map(|f| f.into()).collect(); - let fee_data_availability_mode = - to_api_da_mode(tx.fee_data_availability_mode); - let nonce_data_availability_mode = - to_api_da_mode(tx.nonce_data_availability_mode); - let account_deploy_data = - tx.account_deployment_data.into_iter().map(|f| f.into()).collect(); - - ApiDeclareTransaction::V3(DeclareTransactionV3 { - tip: Tip(tx.tip), - nonce: Nonce(tx.nonce.into()), - sender_address: tx.sender_address.into(), - signature: TransactionSignature(signature), - class_hash: ClassHash(tx.class_hash.into()), - account_deployment_data: AccountDeploymentData(account_deploy_data), - compiled_class_hash: CompiledClassHash(tx.compiled_class_hash.into()), - paymaster_data: PaymasterData(paymaster_data), - fee_data_availability_mode, - nonce_data_availability_mode, - resource_bounds: to_api_resource_bounds(tx.resource_bounds), - }) - } - }; - - let tx = DeclareTransaction::new( - tx, - TransactionHash(hash.into()), - to_class(contract_class).unwrap(), - ) - .expect("class mismatch"); - Transaction::AccountTransaction(AccountTransaction::Declare(tx)) - } - - ExecutableTx::L1Handler(tx) => { - let calldata = tx.calldata.into_iter().map(|f| f.into()).collect(); - - Transaction::L1HandlerTransaction(L1HandlerTransaction { - paid_fee_on_l1: Fee(tx.paid_fee_on_l1), - tx: starknet_api::transaction::L1HandlerTransaction { - nonce: Nonce(tx.nonce.into()), - calldata: Calldata(Arc::new(calldata)), - version: TransactionVersion(1u128.into()), - contract_address: tx.contract_address.into(), - entry_point_selector: EntryPointSelector(tx.entry_point_selector.into()), - }, - tx_hash: TransactionHash(hash.into()), - }) - } - }; - - Self(tx) - } -} - -fn to_api_da_mode(mode: starknet::core::types::DataAvailabilityMode) -> DataAvailabilityMode { - match mode { - starknet::core::types::DataAvailabilityMode::L1 => DataAvailabilityMode::L1, - starknet::core::types::DataAvailabilityMode::L2 => DataAvailabilityMode::L2, - } -} - -fn to_api_resource_bounds( - resource_bounds: starknet::core::types::ResourceBoundsMapping, -) -> ResourceBoundsMapping { - let l1_gas = ResourceBounds { - max_amount: resource_bounds.l1_gas.max_amount, - max_price_per_unit: resource_bounds.l1_gas.max_price_per_unit, - }; - - let l2_gas = ResourceBounds { - max_amount: resource_bounds.l2_gas.max_amount, - max_price_per_unit: resource_bounds.l2_gas.max_price_per_unit, - }; - - ResourceBoundsMapping(BTreeMap::from([(Resource::L1Gas, l1_gas), (Resource::L2Gas, l2_gas)])) -} diff --git a/crates/katana/executor/src/blockifier/utils.rs b/crates/katana/executor/src/blockifier/utils.rs deleted file mode 100644 index 73b4f7e3c2..0000000000 --- a/crates/katana/executor/src/blockifier/utils.rs +++ /dev/null @@ -1,403 +0,0 @@ -use std::collections::HashMap; -use std::sync::Arc; - -use ::blockifier::block_context::BlockContext; -use ::blockifier::execution::call_info::CallInfo; -use ::blockifier::execution::common_hints::ExecutionMode; -use ::blockifier::execution::entry_point::{ - CallEntryPoint, EntryPointExecutionContext, ExecutionResources, -}; -use ::blockifier::execution::errors::EntryPointExecutionError; -use ::blockifier::state::cached_state::{CachedState, GlobalContractCache, MutRefState}; -use ::blockifier::transaction::objects::AccountTransactionContext; -use blockifier::block_context::{BlockInfo, ChainInfo, FeeTokenAddresses, GasPrices}; -use blockifier::fee::fee_utils::{calculate_l1_gas_by_vm_usage, extract_l1_gas_and_vm_usage}; -use blockifier::state::state_api::State; -use blockifier::transaction::errors::TransactionExecutionError; -use blockifier::transaction::objects::{ - DeprecatedAccountTransactionContext, ResourcesMapping, TransactionExecutionInfo, -}; -use convert_case::{Case, Casing}; -use katana_primitives::contract::ContractAddress; -use katana_primitives::env::{BlockEnv, CfgEnv}; -use katana_primitives::receipt::{Event, MessageToL1}; -use katana_primitives::state::{StateUpdates, StateUpdatesWithDeclaredClasses}; -use katana_primitives::transaction::ExecutableTxWithHash; -use katana_primitives::FieldElement; -use katana_provider::traits::contract::ContractClassProvider; -use katana_provider::traits::state::StateProvider; -use starknet::core::types::{FeeEstimate, PriceUnit}; -use starknet::core::utils::parse_cairo_short_string; -use starknet::macros::felt; -use starknet_api::block::{BlockNumber, BlockTimestamp}; -use starknet_api::core::EntryPointSelector; -use starknet_api::transaction::Calldata; -use tracing::trace; - -use super::state::{CachedStateWrapper, StateRefDb}; -use super::TransactionExecutor; - -#[derive(Debug)] -pub struct EntryPointCall { - /// The address of the contract whose function you're calling. - pub contract_address: ContractAddress, - /// The input to the function. - pub calldata: Vec, - /// The function selector. - pub entry_point_selector: FieldElement, -} - -/// Perform a function call on a contract and retrieve the return values. -pub fn call( - request: EntryPointCall, - block_context: BlockContext, - state: Box, -) -> Result, TransactionExecutionError> { - let res = raw_call(request, block_context, state, 1_000_000_000)?; - let retdata = res.execution.retdata.0; - let retdata = retdata.into_iter().map(|f| f.into()).collect::>(); - Ok(retdata) -} - -/// Estimate the execution fee for a list of transactions. -pub fn estimate_fee( - transactions: impl Iterator, - block_context: BlockContext, - state: Box, - validate: bool, -) -> Result, TransactionExecutionError> { - let state = CachedStateWrapper::new(StateRefDb(state)); - let results = TransactionExecutor::new(&state, &block_context, true, validate, transactions) - .with_error_log() - .execute(); - - results - .into_iter() - .map(|res| { - let exec_info = res?; - - if exec_info.revert_error.is_some() { - return Err(TransactionExecutionError::ExecutionError( - EntryPointExecutionError::ExecutionFailed { error_data: Default::default() }, - )); - } - - calculate_execution_fee(&block_context, &exec_info) - }) - .collect::, _>>() -} - -/// Perform a raw entrypoint call of a contract. -pub fn raw_call( - request: EntryPointCall, - block_context: BlockContext, - state: Box, - initial_gas: u64, -) -> Result { - let mut state = CachedState::new(StateRefDb(state), GlobalContractCache::default()); - let mut state = CachedState::new(MutRefState::new(&mut state), GlobalContractCache::default()); - - let call = CallEntryPoint { - initial_gas, - storage_address: request.contract_address.into(), - entry_point_selector: EntryPointSelector(request.entry_point_selector.into()), - calldata: Calldata(Arc::new(request.calldata.into_iter().map(|f| f.into()).collect())), - ..Default::default() - }; - - // TODO: this must be false if fees are disabled I assume. - let limit_steps_by_resources = true; - - // Now, the max step is not given directly to this function. - // It's computed by a new function max_steps, and it tooks the values - // from teh block context itself instead of the input give. - // https://github.com/starkware-libs/blockifier/blob/51b343fe38139a309a69b2482f4b484e8caa5edf/crates/blockifier/src/execution/entry_point.rs#L165 - // The blockifier patch must be adjusted to modify this function to return - // the limit we have into the block context without min applied: - // https://github.com/starkware-libs/blockifier/blob/51b343fe38139a309a69b2482f4b484e8caa5edf/crates/blockifier/src/execution/entry_point.rs#L215 - call.execute( - &mut state, - &mut ExecutionResources::default(), - &mut EntryPointExecutionContext::new( - &block_context, - // TODO: the current does not have Default, let's use the old one for now. - &AccountTransactionContext::Deprecated(DeprecatedAccountTransactionContext::default()), - ExecutionMode::Execute, - limit_steps_by_resources, - )?, - ) - .map_err(TransactionExecutionError::ExecutionError) -} - -/// Calculate the fee of a transaction execution. -pub fn calculate_execution_fee( - block_context: &BlockContext, - exec_info: &TransactionExecutionInfo, -) -> Result { - let (l1_gas_usage, vm_resources) = extract_l1_gas_and_vm_usage(&exec_info.actual_resources); - let l1_gas_by_vm_usage = calculate_l1_gas_by_vm_usage(block_context, &vm_resources)?; - - let total_l1_gas_usage = l1_gas_usage as f64 + l1_gas_by_vm_usage; - - // Gas prices are now in two currencies: eth and strk. - // For now let's only consider eth to be compatible with V2. - // https://github.com/starkware-libs/blockifier/blob/51b343fe38139a309a69b2482f4b484e8caa5edf/crates/blockifier/src/block_context.rs#L19C26-L19C26 - // https://github.com/starkware-libs/blockifier/blob/51b343fe38139a309a69b2482f4b484e8caa5edf/crates/blockifier/src/block_context.rs#L49 - let gas_price = block_context.block_info.gas_prices.eth_l1_gas_price as u64; - let gas_consumed = total_l1_gas_usage.ceil() as u64; - let overall_fee = total_l1_gas_usage.ceil() as u64 * gas_price; - - Ok(FeeEstimate { - gas_price: gas_price.into(), - gas_consumed: gas_consumed.into(), - overall_fee: overall_fee.into(), - unit: PriceUnit::Wei, - }) -} - -/// Create a block context from the chain environment values. -pub fn block_context_from_envs(block_env: &BlockEnv, cfg_env: &CfgEnv) -> BlockContext { - let fee_token_addresses = FeeTokenAddresses { - eth_fee_token_address: cfg_env.fee_token_addresses.eth.into(), - strk_fee_token_address: ContractAddress(felt!("0xb00b5")).into(), - }; - - let gas_prices = GasPrices { - eth_l1_gas_price: block_env.l1_gas_prices.eth, - strk_l1_gas_price: block_env.l1_gas_prices.strk, - eth_l1_data_gas_price: 0, - strk_l1_data_gas_price: 0, - }; - - BlockContext { - block_info: BlockInfo { - gas_prices, - block_number: BlockNumber(block_env.number), - block_timestamp: BlockTimestamp(block_env.timestamp), - sequencer_address: block_env.sequencer_address.into(), - vm_resource_fee_cost: cfg_env.vm_resource_fee_cost.clone().into(), - validate_max_n_steps: cfg_env.validate_max_n_steps, - invoke_tx_max_n_steps: cfg_env.invoke_tx_max_n_steps, - max_recursion_depth: cfg_env.max_recursion_depth, - use_kzg_da: false, - }, - chain_info: ChainInfo { fee_token_addresses, chain_id: cfg_env.chain_id.into() }, - } -} - -pub(crate) fn warn_message_transaction_error_exec_error(err: &TransactionExecutionError) { - match err { - TransactionExecutionError::ExecutionError(ref eperr) => match eperr { - EntryPointExecutionError::ExecutionFailed { error_data } => { - let mut reasons: Vec = vec![]; - error_data.iter().for_each(|felt| { - if let Ok(s) = parse_cairo_short_string(&FieldElement::from(*felt)) { - reasons.push(s); - } - }); - - tracing::warn!(target: "executor", - "Transaction validation error: {}", reasons.join(" ")); - } - _ => tracing::warn!(target: "executor", - "Transaction validation error: {:?}", err), - }, - _ => tracing::warn!(target: "executor", - "Transaction validation error: {:?}", err), - } -} - -pub(crate) fn pretty_print_resources(resources: &ResourcesMapping) -> String { - let mut mapped_strings: Vec<_> = resources - .0 - .iter() - .filter_map(|(k, v)| match k.as_str() { - "l1_gas_usage" => Some(format!("L1 Gas: {}", v)), - "range_check_builtin" => Some(format!("Range Checks: {}", v)), - "ecdsa_builtin" => Some(format!("ECDSA: {}", v)), - "n_steps" => None, - "pedersen_builtin" => Some(format!("Pedersen: {}", v)), - "bitwise_builtin" => Some(format!("Bitwise: {}", v)), - "keccak_builtin" => Some(format!("Keccak: {}", v)), - _ => Some(format!("{}: {}", k.to_case(Case::Title), v)), - }) - .collect::>(); - - // Sort the strings alphabetically - mapped_strings.sort(); - - // Prepend "Steps" if it exists, so it is always first - if let Some(steps) = resources.0.get("n_steps") { - mapped_strings.insert(0, format!("Steps: {}", steps)); - } - - mapped_strings.join(" | ") -} - -pub fn get_state_update_from_cached_state( - state: &CachedStateWrapper, -) -> StateUpdatesWithDeclaredClasses { - let state_diff = state.inner().to_state_diff(); - - let declared_sierra_classes = state.class_cache.read().sierra.clone(); - - let declared_compiled_classes = - state_diff - .class_hash_to_compiled_class_hash - .iter() - .map(|(class_hash, _)| { - let class = - state.class(class_hash.0.into()).unwrap().expect("must exist if declared"); - (class_hash.0.into(), class) - }) - .collect::>(); - - let nonce_updates = - state_diff - .address_to_nonce - .into_iter() - .map(|(key, value)| (key.into(), value.0.into())) - .collect::>(); - - let storage_changes = state_diff - .storage_updates - .into_iter() - .map(|(addr, entries)| { - let entries = entries - .into_iter() - .map(|(k, v)| ((*k.0.key()).into(), v.into())) - .collect::>(); - - (addr.into(), entries) - }) - .collect::>(); - - let contract_updates = - state_diff - .address_to_class_hash - .into_iter() - .map(|(key, value)| (key.into(), value.0.into())) - .collect::>(); - - let declared_classes = - state_diff - .class_hash_to_compiled_class_hash - .into_iter() - .map(|(key, value)| (key.0.into(), value.0.into())) - .collect::>(); - - StateUpdatesWithDeclaredClasses { - declared_sierra_classes, - declared_compiled_classes, - state_updates: StateUpdates { - nonce_updates, - storage_updates: storage_changes, - contract_updates, - declared_classes, - }, - } -} - -pub(super) fn trace_events(events: &[Event]) { - for e in events { - let formatted_keys = - e.keys.iter().map(|k| format!("{k:#x}")).collect::>().join(", "); - - trace!(target: "executor", "Event emitted keys=[{}]", formatted_keys); - } -} - -pub(super) fn events_from_exec_info(execution_info: &TransactionExecutionInfo) -> Vec { - let mut events: Vec = vec![]; - - fn get_events_recursively(call_info: &CallInfo) -> Vec { - let mut events: Vec = vec![]; - - events.extend(call_info.execution.events.iter().map(|e| Event { - from_address: call_info.call.storage_address.into(), - data: e.event.data.0.iter().map(|d| (*d).into()).collect(), - keys: e.event.keys.iter().map(|k| k.0.into()).collect(), - })); - - call_info.inner_calls.iter().for_each(|call| { - events.extend(get_events_recursively(call)); - }); - - events - } - - if let Some(ref call) = execution_info.validate_call_info { - events.extend(get_events_recursively(call)); - } - - if let Some(ref call) = execution_info.execute_call_info { - events.extend(get_events_recursively(call)); - } - - if let Some(ref call) = execution_info.fee_transfer_call_info { - events.extend(get_events_recursively(call)); - } - - events -} - -pub(super) fn l2_to_l1_messages_from_exec_info( - execution_info: &TransactionExecutionInfo, -) -> Vec { - let mut messages = vec![]; - - fn get_messages_recursively(info: &CallInfo) -> Vec { - let mut messages = vec![]; - - // By default, `from_address` must correspond to the contract address that - // is sending the message. In the case of library calls, `code_address` is `None`, - // we then use the `caller_address` instead (which can also be an account). - let from_address = if let Some(code_address) = info.call.code_address { - *code_address.0.key() - } else { - *info.call.caller_address.0.key() - }; - - messages.extend(info.execution.l2_to_l1_messages.iter().map(|m| MessageToL1 { - to_address: - FieldElement::from_byte_slice_be(m.message.to_address.0.as_bytes()).unwrap(), - from_address: ContractAddress(from_address.into()), - payload: m.message.payload.0.iter().map(|p| (*p).into()).collect(), - })); - - info.inner_calls.iter().for_each(|call| { - messages.extend(get_messages_recursively(call)); - }); - - messages - } - - if let Some(ref info) = execution_info.validate_call_info { - messages.extend(get_messages_recursively(info)); - } - - if let Some(ref info) = execution_info.execute_call_info { - messages.extend(get_messages_recursively(info)); - } - - if let Some(ref info) = execution_info.fee_transfer_call_info { - messages.extend(get_messages_recursively(info)); - } - - messages -} diff --git a/crates/katana/executor/src/implementation/mod.rs b/crates/katana/executor/src/implementation/mod.rs index 22486ef41d..1ce7601f62 100644 --- a/crates/katana/executor/src/implementation/mod.rs +++ b/crates/katana/executor/src/implementation/mod.rs @@ -1 +1,3 @@ pub mod blockifier; + +pub mod noop; diff --git a/crates/katana/executor/src/implementation/noop.rs b/crates/katana/executor/src/implementation/noop.rs new file mode 100644 index 0000000000..652873f0da --- /dev/null +++ b/crates/katana/executor/src/implementation/noop.rs @@ -0,0 +1,177 @@ +use katana_primitives::block::ExecutableBlock; +use katana_primitives::class::{ClassHash, CompiledClass, CompiledClassHash, FlattenedSierraClass}; +use katana_primitives::contract::{ContractAddress, Nonce, StorageKey, StorageValue}; +use katana_primitives::env::{BlockEnv, CfgEnv}; +use katana_primitives::receipt::{InvokeTxReceipt, Receipt}; +use katana_primitives::transaction::{ExecutableTxWithHash, Tx, TxWithHash}; +use katana_primitives::FieldElement; +use katana_provider::traits::contract::ContractClassProvider; +use katana_provider::traits::state::StateProvider; +use katana_provider::ProviderResult; + +use crate::{ + BlockExecutor, EntryPointCall, ExecutionOutput, ExecutorFactory, ExecutorResult, + SimulationFlag, TransactionExecutionOutput, TransactionExecutor, +}; + +/// A no-op executor factory. Creates an executor that does nothing. +#[derive(Debug, Default)] +pub struct NoopExecutorFactory { + cfg: CfgEnv, +} + +impl NoopExecutorFactory { + /// Create a new no-op executor factory. + pub fn new() -> Self { + Self::default() + } +} + +impl ExecutorFactory for NoopExecutorFactory { + fn with_state<'a, P>(&self, state: P) -> Box + 'a> + where + P: StateProvider + 'a, + { + let _ = state; + Box::::default() + } + + fn with_state_and_block_env<'a, P>( + &self, + state: P, + block_env: BlockEnv, + ) -> Box + 'a> + where + P: StateProvider + 'a, + { + let _ = state; + let _ = block_env; + Box::new(NoopExecutor { block_env }) + } + + fn cfg(&self) -> &CfgEnv { + &self.cfg + } +} + +#[derive(Debug, Default)] +struct NoopExecutor { + block_env: BlockEnv, +} + +impl TransactionExecutor for NoopExecutor { + fn execute( + &mut self, + tx: ExecutableTxWithHash, + ) -> ExecutorResult> { + let _ = tx; + Ok(Box::new(NoopTransactionExecutionOutput)) + } + + fn simulate( + &self, + tx: ExecutableTxWithHash, + flags: SimulationFlag, + ) -> ExecutorResult> { + let _ = tx; + let _ = flags; + Ok(Box::new(NoopTransactionExecutionOutput)) + } + + fn call(&self, call: EntryPointCall, initial_gas: u128) -> ExecutorResult> { + let _ = call; + let _ = initial_gas; + Ok(vec![]) + } +} + +impl<'a> BlockExecutor<'a> for NoopExecutor { + fn execute_block(&mut self, block: ExecutableBlock) -> ExecutorResult<()> { + let _ = block; + Ok(()) + } + + fn take_execution_output(&mut self) -> ExecutorResult { + Ok(ExecutionOutput::default()) + } + + fn state(&self) -> Box { + Box::new(NoopStateProvider) + } + + fn transactions(&self) -> &[(TxWithHash, Option)] { + &[] + } + + fn block_env(&self) -> BlockEnv { + self.block_env.clone() + } +} + +struct NoopTransactionExecutionOutput; + +impl TransactionExecutionOutput for NoopTransactionExecutionOutput { + fn receipt(&self, tx: &Tx) -> Receipt { + let _ = tx; + Receipt::Invoke(InvokeTxReceipt::default()) + } + + fn actual_fee(&self) -> u128 { + 0 + } + + fn gas_used(&self) -> u128 { + 0 + } + + fn revert_error(&self) -> Option<&str> { + None + } +} + +struct NoopStateProvider; + +impl ContractClassProvider for NoopStateProvider { + fn class(&self, hash: ClassHash) -> ProviderResult> { + let _ = hash; + Ok(None) + } + + fn compiled_class_hash_of_class_hash( + &self, + hash: ClassHash, + ) -> ProviderResult> { + let _ = hash; + Ok(None) + } + + fn sierra_class(&self, hash: ClassHash) -> ProviderResult> { + let _ = hash; + Ok(None) + } +} + +impl StateProvider for NoopStateProvider { + fn class_hash_of_contract( + &self, + address: ContractAddress, + ) -> ProviderResult> { + let _ = address; + Ok(None) + } + + fn nonce(&self, address: ContractAddress) -> ProviderResult> { + let _ = address; + Ok(None) + } + + fn storage( + &self, + address: ContractAddress, + storage_key: StorageKey, + ) -> ProviderResult> { + let _ = address; + let _ = storage_key; + Ok(None) + } +} diff --git a/crates/katana/executor/src/lib.rs b/crates/katana/executor/src/lib.rs index 809f18c2a0..d1232a3b8c 100644 --- a/crates/katana/executor/src/lib.rs +++ b/crates/katana/executor/src/lib.rs @@ -1,5 +1,3 @@ -pub mod blockifier; - pub mod implementation; mod abstraction; diff --git a/crates/katana/primitives/src/block.rs b/crates/katana/primitives/src/block.rs index f3bf6ac29f..671489c55d 100644 --- a/crates/katana/primitives/src/block.rs +++ b/crates/katana/primitives/src/block.rs @@ -72,14 +72,10 @@ pub struct Header { } impl Header { - pub fn new( - partial_header: PartialHeader, - number: BlockNumber, - state_root: FieldElement, - ) -> Self { + pub fn new(partial_header: PartialHeader, state_root: FieldElement) -> Self { Self { - number, state_root, + number: partial_header.number, version: partial_header.version, timestamp: partial_header.timestamp, gas_prices: partial_header.gas_prices, diff --git a/crates/katana/rpc/rpc-types/src/error/starknet.rs b/crates/katana/rpc/rpc-types/src/error/starknet.rs index 564f704bb3..03afcdc9e1 100644 --- a/crates/katana/rpc/rpc-types/src/error/starknet.rs +++ b/crates/katana/rpc/rpc-types/src/error/starknet.rs @@ -33,6 +33,13 @@ pub enum StarknetApiError { InvalidContinuationToken, #[error("Contract error")] ContractError { revert_error: String }, + #[error("Transaction execution error")] + TransactionExecutionError { + /// The index of the first transaction failing in a sequence of given transactions. + transaction_index: usize, + /// The revert error with the execution trace up to the point of failure. + execution_error: String, + }, #[error("Invalid contract class")] InvalidContractClass, #[error("Class already declared")] @@ -70,7 +77,7 @@ pub enum StarknetApiError { } impl StarknetApiError { - fn code(&self) -> i32 { + pub fn code(&self) -> i32 { match self { StarknetApiError::FailedToReceiveTxn => 1, StarknetApiError::ContractNotFound => 20, @@ -86,6 +93,7 @@ impl StarknetApiError { StarknetApiError::TooManyKeysInFilter => 34, StarknetApiError::FailedToFetchPendingTransactions => 38, StarknetApiError::ContractError { .. } => 40, + StarknetApiError::TransactionExecutionError { .. } => 41, StarknetApiError::InvalidContractClass => 50, StarknetApiError::ClassAlreadyDeclared => 51, StarknetApiError::InvalidTransactionNonce => 52, diff --git a/crates/katana/rpc/rpc/src/dev.rs b/crates/katana/rpc/rpc/src/dev.rs index 5475bc5290..e90290c1c5 100644 --- a/crates/katana/rpc/rpc/src/dev.rs +++ b/crates/katana/rpc/rpc/src/dev.rs @@ -2,22 +2,23 @@ use std::sync::Arc; use jsonrpsee::core::{async_trait, Error}; use katana_core::sequencer::KatanaSequencer; +use katana_executor::ExecutorFactory; use katana_primitives::FieldElement; use katana_rpc_api::dev::DevApiServer; use katana_rpc_types::error::katana::KatanaApiError; -pub struct DevApi { - sequencer: Arc, +pub struct DevApi { + sequencer: Arc>, } -impl DevApi { - pub fn new(sequencer: Arc) -> Self { +impl DevApi { + pub fn new(sequencer: Arc>) -> Self { Self { sequencer } } } #[async_trait] -impl DevApiServer for DevApi { +impl DevApiServer for DevApi { async fn generate_block(&self) -> Result<(), Error> { self.sequencer.block_producer().force_mine(); Ok(()) diff --git a/crates/katana/rpc/rpc/src/katana.rs b/crates/katana/rpc/rpc/src/katana.rs index 3dffbfdef4..46e5f61c68 100644 --- a/crates/katana/rpc/rpc/src/katana.rs +++ b/crates/katana/rpc/rpc/src/katana.rs @@ -2,21 +2,22 @@ use std::sync::Arc; use jsonrpsee::core::{async_trait, Error}; use katana_core::sequencer::KatanaSequencer; +use katana_executor::ExecutorFactory; use katana_rpc_api::katana::KatanaApiServer; use katana_rpc_types::account::Account; -pub struct KatanaApi { - sequencer: Arc, +pub struct KatanaApi { + sequencer: Arc>, } -impl KatanaApi { - pub fn new(sequencer: Arc) -> Self { +impl KatanaApi { + pub fn new(sequencer: Arc>) -> Self { Self { sequencer } } } #[async_trait] -impl KatanaApiServer for KatanaApi { +impl KatanaApiServer for KatanaApi { async fn predeployed_accounts(&self) -> Result, Error> { Ok(self .sequencer diff --git a/crates/katana/rpc/rpc/src/lib.rs b/crates/katana/rpc/rpc/src/lib.rs index 2a536b8aa2..93d6352bbc 100644 --- a/crates/katana/rpc/rpc/src/lib.rs +++ b/crates/katana/rpc/rpc/src/lib.rs @@ -18,6 +18,7 @@ use jsonrpsee::tracing::debug; use jsonrpsee::types::Params; use jsonrpsee::RpcModule; use katana_core::sequencer::KatanaSequencer; +use katana_executor::ExecutorFactory; use katana_rpc_api::dev::DevApiServer; use katana_rpc_api::katana::KatanaApiServer; use katana_rpc_api::starknet::StarknetApiServer; @@ -30,7 +31,10 @@ use crate::katana::KatanaApi; use crate::starknet::StarknetApi; use crate::torii::ToriiApi; -pub async fn spawn(sequencer: Arc, config: ServerConfig) -> Result { +pub async fn spawn( + sequencer: Arc>, + config: ServerConfig, +) -> Result { let mut methods = RpcModule::new(()); methods.register_method("health", |_, _| Ok(serde_json::json!({ "health": true })))?; diff --git a/crates/katana/rpc/rpc/src/starknet.rs b/crates/katana/rpc/rpc/src/starknet.rs index d42a88d25c..8d80fac292 100644 --- a/crates/katana/rpc/rpc/src/starknet.rs +++ b/crates/katana/rpc/rpc/src/starknet.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use jsonrpsee::core::{async_trait, Error, RpcResult}; use katana_core::backend::contract::StarknetContract; use katana_core::sequencer::KatanaSequencer; -use katana_executor::blockifier::utils::EntryPointCall; +use katana_executor::{EntryPointCall, ExecutorFactory, SimulationFlag}; use katana_primitives::block::{BlockHashOrNumber, BlockIdOrTag, FinalityStatus, PartialHeader}; use katana_primitives::conversion::rpc::legacy_inner_to_rpc_class; use katana_primitives::transaction::{ExecutableTx, ExecutableTxWithHash, TxHash}; @@ -30,20 +30,25 @@ use katana_rpc_types::transaction::{ use katana_rpc_types::{ContractClass, FeeEstimate, FeltAsHex, FunctionCall, SimulationFlags}; use katana_rpc_types_builder::ReceiptBuilder; use katana_tasks::{BlockingTaskPool, TokioTaskSpawner}; -use starknet::core::types::{BlockTag, TransactionExecutionStatus, TransactionStatus}; +use starknet::core::types::{BlockTag, PriceUnit, TransactionExecutionStatus, TransactionStatus}; -#[derive(Clone)] -pub struct StarknetApi { - inner: Arc, +pub struct StarknetApi { + inner: Arc>, } -struct StarknetApiInner { - sequencer: Arc, +impl Clone for StarknetApi { + fn clone(&self) -> Self { + Self { inner: Arc::clone(&self.inner) } + } +} + +struct StarknetApiInner { + sequencer: Arc>, blocking_task_pool: BlockingTaskPool, } -impl StarknetApi { - pub fn new(sequencer: Arc) -> Self { +impl StarknetApi { + pub fn new(sequencer: Arc>) -> Self { let blocking_task_pool = BlockingTaskPool::new().expect("failed to create blocking task pool"); Self { inner: Arc::new(StarknetApiInner { sequencer, blocking_task_pool }) } @@ -68,7 +73,7 @@ impl StarknetApi { } } #[async_trait] -impl StarknetApiServer for StarknetApi { +impl StarknetApiServer for StarknetApi { async fn chain_id(&self) -> RpcResult { Ok(FieldElement::from(self.inner.sequencer.chain_id()).into()) } @@ -158,10 +163,9 @@ impl StarknetApiServer for StarknetApi { let provider = this.inner.sequencer.backend.blockchain.provider(); if BlockIdOrTag::Tag(BlockTag::Pending) == block_id { - if let Some(pending_state) = this.inner.sequencer.pending_state() { - let block_env = pending_state.block_envs.read().0.clone(); - let latest_hash = - BlockHashProvider::latest_hash(provider).map_err(StarknetApiError::from)?; + if let Some(executor) = this.inner.sequencer.pending_executor() { + let block_env = executor.read().block_env(); + let latest_hash = provider.latest_hash().map_err(StarknetApiError::from)?; let gas_prices = block_env.l1_gas_prices.clone(); @@ -169,14 +173,14 @@ impl StarknetApiServer for StarknetApi { number: block_env.number, gas_prices, parent_hash: latest_hash, - version: CURRENT_STARKNET_VERSION, timestamp: block_env.timestamp, + version: CURRENT_STARKNET_VERSION, sequencer_address: block_env.sequencer_address, }; - let transactions = pending_state - .executed_txs + let transactions = executor .read() + .transactions() .iter() .map(|(tx, _)| tx.hash) .collect::>(); @@ -209,12 +213,13 @@ impl StarknetApiServer for StarknetApi { self.on_io_blocking_task(move |this| { // TEMP: have to handle pending tag independently for now let tx = if BlockIdOrTag::Tag(BlockTag::Pending) == block_id { - let Some(pending_state) = this.inner.sequencer.pending_state() else { + let Some(executor) = this.inner.sequencer.pending_executor() else { return Err(StarknetApiError::BlockNotFound.into()); }; - let pending_txs = pending_state.executed_txs.read(); - pending_txs.iter().nth(index as usize).map(|(tx, _)| tx.clone()) + let executor = executor.read(); + let pending_txs = executor.transactions(); + pending_txs.get(index as usize).map(|(tx, _)| tx.clone()) } else { let provider = &this.inner.sequencer.backend.blockchain.provider(); @@ -237,10 +242,9 @@ impl StarknetApiServer for StarknetApi { let provider = this.inner.sequencer.backend.blockchain.provider(); if BlockIdOrTag::Tag(BlockTag::Pending) == block_id { - if let Some(pending_state) = this.inner.sequencer.pending_state() { - let block_env = pending_state.block_envs.read().0.clone(); - let latest_hash = - BlockHashProvider::latest_hash(provider).map_err(StarknetApiError::from)?; + if let Some(executor) = this.inner.sequencer.pending_executor() { + let block_env = executor.read().block_env(); + let latest_hash = provider.latest_hash().map_err(StarknetApiError::from)?; let gas_prices = block_env.l1_gas_prices.clone(); @@ -253,9 +257,9 @@ impl StarknetApiServer for StarknetApi { sequencer_address: block_env.sequencer_address, }; - let transactions = pending_state - .executed_txs + let transactions = executor .read() + .transactions() .iter() .map(|(tx, _)| tx.clone()) .collect::>(); @@ -320,13 +324,12 @@ impl StarknetApiServer for StarknetApi { Some(receipt) => Ok(MaybePendingTxReceipt::Receipt(receipt)), None => { - let pending_receipt = this.inner.sequencer.pending_state().and_then(|s| { - s.executed_txs - .read() - .iter() - .find(|(tx, _)| tx.hash == transaction_hash) - .map(|(_, rct)| rct.receipt.clone()) - }); + let pending_receipt = + this.inner.sequencer.pending_executor().and_then(|executor| { + executor.read().transactions().iter().find_map(|(tx, rct)| { + if tx.hash == transaction_hash { rct.clone() } else { None } + }) + }); let Some(pending_receipt) = pending_receipt else { return Err(StarknetApiError::TxnHashNotFound.into()); @@ -420,9 +423,23 @@ impl StarknetApiServer for StarknetApi { entry_point_selector: request.entry_point_selector, }; - let res = - this.inner.sequencer.call(request, block_id).map_err(StarknetApiError::from)?; - Ok(res.into_iter().map(|v| v.into()).collect()) + let sequencer = &this.inner.sequencer; + + // get the state and block env at the specified block for function call execution + let state = sequencer.state(&block_id).map_err(StarknetApiError::from)?; + let env = sequencer + .block_env_at(block_id) + .map_err(StarknetApiError::from)? + .ok_or(StarknetApiError::BlockNotFound)?; + + let executor = sequencer.backend.executor_factory.with_state_and_block_env(state, env); + + match executor.call(request, 1_000_000_000) { + Ok(retdata) => Ok(retdata.into_iter().map(|v| v.into()).collect()), + Err(err) => Err(Error::from(StarknetApiError::ContractError { + revert_error: err.to_string(), + })), + } }) .await } @@ -476,7 +493,8 @@ impl StarknetApiServer for StarknetApi { block_id: BlockIdOrTag, ) -> RpcResult> { self.on_cpu_blocking_task(move |this| { - let chain_id = this.inner.sequencer.chain_id(); + let sequencer = &this.inner.sequencer; + let chain_id = sequencer.chain_id(); let transactions = request .into_iter() @@ -513,13 +531,51 @@ impl StarknetApiServer for StarknetApi { let skip_validate = simulation_flags.iter().any(|flag| flag == &SimulationFlags::SkipValidate); - let res = this - .inner - .sequencer - .estimate_fee(transactions, block_id, skip_validate) - .map_err(StarknetApiError::from)?; + // If the node is run with transaction validation disabled, then we should not validate + // transactions when estimating the fee even if the `SKIP_VALIDATE` flag is not set. + let should_validate = + !(skip_validate || this.inner.sequencer.backend.config.disable_validate); - Ok(res) + let simulation_flag = + SimulationFlag { skip_validate: !should_validate, ..Default::default() }; + + // get the state and block env at the specified block for execution + let state = sequencer.state(&block_id).map_err(StarknetApiError::from)?; + let env = sequencer + .block_env_at(block_id) + .map_err(StarknetApiError::from)? + .ok_or(StarknetApiError::BlockNotFound)?; + + // create the executor + let executor = sequencer.backend.executor_factory.with_state_and_block_env(state, env); + + let mut estimates: Vec = Vec::with_capacity(transactions.len()); + + for (i, tx) in transactions.into_iter().enumerate() { + match executor.simulate(tx, simulation_flag.clone()) { + Ok(output) => { + let overall_fee = output.actual_fee().into(); + let gas_consumed = output.gas_used().into(); + let gas_price = executor.block_env().l1_gas_prices.eth.into(); + + estimates.push(FeeEstimate { + gas_consumed, + gas_price, + overall_fee, + unit: PriceUnit::Wei, + }) + } + + Err(err) => { + return Err(Error::from(StarknetApiError::TransactionExecutionError { + transaction_index: i, + execution_error: err.to_string(), + })); + } + } + } + + Ok(estimates) }) .await } @@ -635,16 +691,19 @@ impl StarknetApiServer for StarknetApi { } } - let pending_state = this.inner.sequencer.pending_state(); - let state = pending_state.ok_or(StarknetApiError::TxnHashNotFound)?; - let executed_txs = state.executed_txs.read(); + let pending_executor = + this.inner.sequencer.pending_executor().ok_or(StarknetApiError::TxnHashNotFound)?; + let pending_executor = pending_executor.read(); + + let pending_txs = pending_executor.transactions(); + // filter only the valid executed transactions (the ones with a receipt) + let mut executed_txs = pending_txs.iter().filter(|(_, rct)| rct.is_some()); // attemps to find in the valid transactions list first (executed_txs) // if not found, then search in the rejected transactions list (rejected_txs) if let Some(is_reverted) = executed_txs - .iter() .find(|(tx, _)| tx.hash == transaction_hash) - .map(|(_, rct)| rct.receipt.is_reverted()) + .map(|(_, rct)| rct.as_ref().is_some_and(|r| r.is_reverted())) { let exec_status = if is_reverted { TransactionExecutionStatus::Reverted @@ -654,10 +713,11 @@ impl StarknetApiServer for StarknetApi { Ok(TransactionStatus::AcceptedOnL2(exec_status)) } else { - let rejected_txs = state.rejected_txs.read(); + // we filter out the executed transactions and only take the rejected ones (the ones + // with no receipt) + let mut rejected_txs = pending_txs.iter().filter(|(_, rct)| rct.is_none()); rejected_txs - .iter() .find(|(tx, _)| tx.hash == transaction_hash) .map(|_| TransactionStatus::Rejected) .ok_or(Error::from(StarknetApiError::TxnHashNotFound)) diff --git a/crates/katana/rpc/rpc/src/torii.rs b/crates/katana/rpc/rpc/src/torii.rs index a3a253f4af..137bf22488 100644 --- a/crates/katana/rpc/rpc/src/torii.rs +++ b/crates/katana/rpc/rpc/src/torii.rs @@ -4,6 +4,7 @@ use futures::StreamExt; use jsonrpsee::core::{async_trait, RpcResult}; use katana_core::sequencer::KatanaSequencer; use katana_core::service::block_producer::BlockProducerMode; +use katana_executor::ExecutorFactory; use katana_primitives::block::BlockHashOrNumber; use katana_provider::traits::transaction::TransactionProvider; use katana_rpc_api::torii::ToriiApiServer; @@ -15,13 +16,18 @@ use katana_tasks::TokioTaskSpawner; const MAX_PAGE_SIZE: usize = 100; -#[derive(Clone)] -pub struct ToriiApi { - sequencer: Arc, +pub struct ToriiApi { + sequencer: Arc>, } -impl ToriiApi { - pub fn new(sequencer: Arc) -> Self { +impl Clone for ToriiApi { + fn clone(&self) -> Self { + Self { sequencer: self.sequencer.clone() } + } +} + +impl ToriiApi { + pub fn new(sequencer: Arc>) -> Self { Self { sequencer } } @@ -36,7 +42,7 @@ impl ToriiApi { } #[async_trait] -impl ToriiApiServer for ToriiApi { +impl ToriiApiServer for ToriiApi { async fn get_transactions( &self, cursor: TransactionsPageCursor, @@ -93,25 +99,27 @@ impl ToriiApiServer for ToriiApi { } } - if let Some(pending_state) = this.sequencer.pending_state() { + if let Some(pending_executor) = this.sequencer.pending_executor() { let remaining = MAX_PAGE_SIZE - transactions.len(); // If cursor is in the pending block if cursor.block_number == latest_block_number + 1 { - let pending_transactions = pending_state - .executed_txs + let pending_transactions = pending_executor .read() + .transactions() .iter() .skip(cursor.transaction_index as usize) .take(remaining) - .map(|(tx, info)| { - ( - tx.clone(), - MaybePendingTxReceipt::Pending(PendingTxReceipt::new( - tx.hash, - info.receipt.clone(), - )), - ) + .filter_map(|(tx, receipt)| { + receipt.as_ref().map(|rct| { + ( + tx.clone(), + MaybePendingTxReceipt::Pending(PendingTxReceipt::new( + tx.hash, + rct.clone(), + )), + ) + }) }) .collect::>(); @@ -135,19 +143,21 @@ impl ToriiApiServer for ToriiApi { next_cursor.transaction_index += pending_transactions.len() as u64; transactions.extend(pending_transactions); } else { - let pending_transactions = pending_state - .executed_txs + let pending_transactions = pending_executor .read() + .transactions() .iter() .take(remaining) - .map(|(tx, info)| { - ( - tx.clone(), - MaybePendingTxReceipt::Pending(PendingTxReceipt::new( - tx.hash, - info.receipt.clone(), - )), - ) + .filter_map(|(tx, receipt)| { + receipt.as_ref().map(|rct| { + ( + tx.clone(), + MaybePendingTxReceipt::Pending(PendingTxReceipt::new( + tx.hash, + rct.clone(), + )), + ) + }) }) .collect::>(); next_cursor.block_number += 1; diff --git a/crates/katana/storage/provider/src/error.rs b/crates/katana/storage/provider/src/error.rs index f4120f03e9..6fe75d4c6d 100644 --- a/crates/katana/storage/provider/src/error.rs +++ b/crates/katana/storage/provider/src/error.rs @@ -105,4 +105,8 @@ pub enum ProviderError { #[cfg(feature = "fork")] #[error(transparent)] ForkedBackend(#[from] ForkedBackendError), + + /// Any error that is not covered by the other variants. + #[error("soemthing went wrong: {0}")] + Other(String), } diff --git a/crates/katana/tasks/src/lib.rs b/crates/katana/tasks/src/lib.rs index 148630ade0..a51cee743f 100644 --- a/crates/katana/tasks/src/lib.rs +++ b/crates/katana/tasks/src/lib.rs @@ -64,7 +64,7 @@ impl TokioTaskSpawner { #[error("Failed to initialize blocking thread pool: {0}")] pub struct BlockingTaskPoolInitError(rayon::ThreadPoolBuildError); -type BlockingTaskResult = Result>; +pub type BlockingTaskResult = Result>; #[derive(Debug)] #[must_use = "BlockingTaskHandle does nothing unless polled"] @@ -76,7 +76,7 @@ impl Future for BlockingTaskHandle { fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { match Pin::new(&mut self.get_mut().0).poll(cx) { Poll::Ready(Ok(res)) => Poll::Ready(res), - Poll::Ready(Err(_)) => panic!("blocking task cancelled"), + Poll::Ready(Err(cancelled)) => Poll::Ready(Err(Box::new(cancelled))), Poll::Pending => Poll::Pending, } }