diff --git a/Cargo.lock b/Cargo.lock index 4fc010cebd9..6f677643d86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10882,6 +10882,7 @@ dependencies = [ "serde", "starknet_api", "starknet_batcher_types", + "starknet_class_manager_types", "starknet_consensus", "starknet_consensus_orchestrator", "starknet_infra_utils", @@ -10923,6 +10924,7 @@ dependencies = [ "starknet-types-core", "starknet_api", "starknet_batcher_types", + "starknet_class_manager_types", "starknet_consensus", "starknet_infra_utils", "starknet_state_sync_types", diff --git a/crates/starknet_consensus_manager/Cargo.toml b/crates/starknet_consensus_manager/Cargo.toml index 005959d1e07..b3d91000925 100644 --- a/crates/starknet_consensus_manager/Cargo.toml +++ b/crates/starknet_consensus_manager/Cargo.toml @@ -16,6 +16,7 @@ papyrus_protobuf.workspace = true serde.workspace = true starknet_api.workspace = true starknet_batcher_types.workspace = true +starknet_class_manager_types.workspace = true starknet_consensus.workspace = true starknet_consensus_orchestrator.workspace = true starknet_infra_utils.workspace = true diff --git a/crates/starknet_consensus_manager/src/consensus_manager.rs b/crates/starknet_consensus_manager/src/consensus_manager.rs index 0efe81c67eb..5e793fe2c8a 100644 --- a/crates/starknet_consensus_manager/src/consensus_manager.rs +++ b/crates/starknet_consensus_manager/src/consensus_manager.rs @@ -11,6 +11,7 @@ use papyrus_protobuf::consensus::{HeightAndRound, ProposalPart, StreamMessage, V use starknet_api::block::BlockNumber; use starknet_batcher_types::batcher_types::RevertBlockInput; use starknet_batcher_types::communication::SharedBatcherClient; +use starknet_class_manager_types::SharedClassManagerClient; use starknet_consensus::stream_handler::StreamHandler; use starknet_consensus::types::ConsensusError; use starknet_consensus_orchestrator::cende::CendeAmbassador; @@ -33,6 +34,7 @@ pub struct ConsensusManager { pub config: ConsensusManagerConfig, pub batcher_client: SharedBatcherClient, pub state_sync_client: SharedStateSyncClient, + pub class_manager_client: SharedClassManagerClient, } impl ConsensusManager { @@ -40,8 +42,9 @@ impl ConsensusManager { config: ConsensusManagerConfig, batcher_client: SharedBatcherClient, state_sync_client: SharedStateSyncClient, + class_manager_client: SharedClassManagerClient, ) -> Self { - Self { config, batcher_client, state_sync_client } + Self { config, batcher_client, state_sync_client, class_manager_client } } pub async fn run(&self) -> Result<(), ConsensusError> { @@ -94,6 +97,7 @@ impl ConsensusManager { let context = SequencerConsensusContext::new( self.config.context_config.clone(), + Arc::clone(&self.class_manager_client), Arc::clone(&self.state_sync_client), Arc::clone(&self.batcher_client), outbound_internal_sender, @@ -179,8 +183,9 @@ pub fn create_consensus_manager( config: ConsensusManagerConfig, batcher_client: SharedBatcherClient, state_sync_client: SharedStateSyncClient, + class_manager_client: SharedClassManagerClient, ) -> ConsensusManager { - ConsensusManager::new(config, batcher_client, state_sync_client) + ConsensusManager::new(config, batcher_client, state_sync_client, class_manager_client) } #[async_trait] diff --git a/crates/starknet_consensus_manager/src/consensus_manager_test.rs b/crates/starknet_consensus_manager/src/consensus_manager_test.rs index 3eb8fd01578..b60d122bc9a 100644 --- a/crates/starknet_consensus_manager/src/consensus_manager_test.rs +++ b/crates/starknet_consensus_manager/src/consensus_manager_test.rs @@ -5,6 +5,7 @@ use rstest::rstest; use starknet_api::block::BlockNumber; use starknet_batcher_types::batcher_types::{GetHeightResponse, RevertBlockInput}; use starknet_batcher_types::communication::MockBatcherClient; +use starknet_class_manager_types::EmptyClassManagerClient; use starknet_state_sync_types::communication::MockStateSyncClient; use tokio::time::{timeout, Duration}; @@ -41,6 +42,7 @@ async fn revert_batcher_blocks() { manager_config, Arc::new(mock_batcher_client), Arc::new(MockStateSyncClient::new()), + Arc::new(EmptyClassManagerClient), ); // TODO(Shahak, dvir): try to solve this better (the test will take 100 milliseconds to run). @@ -68,6 +70,7 @@ async fn revert_with_invalid_height_panics(#[case] revert_up_to_and_including: B consensus_manager_config, Arc::new(mock_batcher), Arc::new(MockStateSyncClient::new()), + Arc::new(EmptyClassManagerClient), ); consensus_manager.run().await.unwrap(); @@ -86,6 +89,7 @@ async fn no_reverts_without_config() { manager_config, Arc::new(mock_batcher), Arc::new(MockStateSyncClient::new()), + Arc::new(EmptyClassManagerClient), ); // TODO(Shahak, dvir): try to solve this better (the test will take 100 milliseconds to run). diff --git a/crates/starknet_consensus_orchestrator/Cargo.toml b/crates/starknet_consensus_orchestrator/Cargo.toml index c57774bfc0d..c76d6c60677 100644 --- a/crates/starknet_consensus_orchestrator/Cargo.toml +++ b/crates/starknet_consensus_orchestrator/Cargo.toml @@ -24,6 +24,7 @@ serde_json = { workspace = true, features = ["arbitrary_precision"] } starknet-types-core.workspace = true starknet_api.workspace = true starknet_batcher_types = { workspace = true, features = ["testing"] } +starknet_class_manager_types.workspace = true starknet_consensus.workspace = true starknet_infra_utils.workspace = true starknet_state_sync_types = { workspace = true, features = ["testing"] } diff --git a/crates/starknet_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/starknet_consensus_orchestrator/src/sequencer_consensus_context.rs index 4deb18e809a..4eca5e54f23 100644 --- a/crates/starknet_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/starknet_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -35,9 +35,9 @@ use starknet_api::block::{ GasPrices, NonzeroGasPrice, }; -use starknet_api::core::{ChainId, ContractAddress, SequencerContractAddress}; -use starknet_api::executable_transaction::Transaction as ExecutableTransaction; -use starknet_api::transaction::{Transaction, TransactionHash}; +use starknet_api::consensus_transaction::InternalConsensusTransaction; +use starknet_api::core::{ContractAddress, SequencerContractAddress}; +use starknet_api::transaction::TransactionHash; use starknet_batcher_types::batcher_types::{ DecisionReachedInput, DecisionReachedResponse, @@ -52,6 +52,11 @@ use starknet_batcher_types::batcher_types::{ ValidateBlockInput, }; use starknet_batcher_types::communication::BatcherClient; +use starknet_class_manager_types::transaction_converter::{ + TransactionConverter, + TransactionConverterTrait, +}; +use starknet_class_manager_types::SharedClassManagerClient; use starknet_consensus::types::{ ConsensusContext, ConsensusError, @@ -88,8 +93,10 @@ const TEMPORARY_GAS_PRICES: GasPrices = GasPrices { // {height: {proposal_id: (content, [proposal_ids])}} // Note that multiple proposals IDs can be associated with the same content, but we only need to // store one of them. -type HeightToIdToContent = - BTreeMap, ProposalId)>>; +type HeightToIdToContent = BTreeMap< + BlockNumber, + HashMap, ProposalId)>, +>; type ValidationParams = (BlockNumber, ValidatorId, Duration, mpsc::Receiver); const CHANNEL_SIZE: usize = 100; @@ -112,7 +119,8 @@ const BUILD_PROPOSAL_MARGIN: Duration = Duration::from_millis(1000); const VALIDATE_PROPOSAL_MARGIN: Duration = Duration::from_secs(10); pub struct SequencerConsensusContext { - config: ContextConfig, + // TODO(Shahak): change this into a dynamic TransactionConverterTrait. + transaction_converter: TransactionConverter, state_sync_client: SharedStateSyncClient, batcher: Arc, validators: Vec, @@ -146,6 +154,7 @@ pub struct SequencerConsensusContext { impl SequencerConsensusContext { pub fn new( config: ContextConfig, + class_manager_client: SharedClassManagerClient, state_sync_client: SharedStateSyncClient, batcher: Arc, outbound_proposal_sender: mpsc::Sender<(HeightAndRound, mpsc::Receiver)>, @@ -154,7 +163,10 @@ impl SequencerConsensusContext { ) -> Self { let num_validators = config.num_validators; Self { - config, + transaction_converter: TransactionConverter::new( + class_manager_client, + config.chain_id.clone(), + ), state_sync_client, batcher, outbound_proposal_sender, @@ -215,6 +227,7 @@ impl ConsensusContext for SequencerConsensusContext { .await .expect("Failed to send proposal receiver"); let gas_prices = self.gas_prices(); + let transaction_converter = self.transaction_converter.clone(); info!(?proposal_init, ?timeout, %proposal_id, "Building proposal"); let handle = tokio::spawn( @@ -229,6 +242,7 @@ impl ConsensusContext for SequencerConsensusContext { proposal_id, cende_write_success, gas_prices, + transaction_converter, ) .await; } @@ -376,6 +390,13 @@ impl ConsensusContext for SequencerConsensusContext { // TODO(dvir): pass here real `BlobParameters` info. // TODO(dvir): when passing here the correct `BlobParameters`, also test that // `prepare_blob_for_next_height` is called with the correct parameters. + let transactions = futures::future::join_all(transactions.into_iter().map(|tx| { + self.transaction_converter.convert_internal_consensus_tx_to_executable_tx(tx) + })) + .await + .into_iter().collect::, _>>() + // TODO(shahak): Do not panic here. + .expect("Failed converting transactions for cende"); self.cende_ambassador .prepare_blob_for_next_height(BlobParameters { // TODO(dvir): use the real `BlockInfo` when consensus will save it. @@ -463,8 +484,8 @@ impl SequencerConsensusContext { let cancel_token = CancellationToken::new(); let cancel_token_clone = cancel_token.clone(); let batcher = Arc::clone(&self.batcher); + let transaction_converter = self.transaction_converter.clone(); let valid_proposals = Arc::clone(&self.valid_proposals); - let chain_id = self.config.chain_id.clone(); let proposal_id = ProposalId(self.proposal_id); self.proposal_id += 1; let gas_prices = self.gas_prices(); @@ -474,7 +495,6 @@ impl SequencerConsensusContext { let handle = tokio::spawn( async move { validate_proposal( - chain_id, proposal_id, batcher.as_ref(), height, @@ -485,6 +505,7 @@ impl SequencerConsensusContext { fin_sender, cancel_token_clone, gas_prices, + transaction_converter, ) .await } @@ -515,6 +536,7 @@ async fn build_proposal( proposal_id: ProposalId, cende_write_success: AbortOnDropHandle, gas_prices: GasPrices, + transaction_converter: TransactionConverter, ) { initialize_build(proposal_id, &proposal_init, timeout, batcher.as_ref(), gas_prices).await; proposal_sender @@ -522,9 +544,14 @@ async fn build_proposal( .await .expect("Failed to send proposal init"); - let Some((proposal_content_id, content)) = - get_proposal_content(proposal_id, batcher.as_ref(), proposal_sender, cende_write_success) - .await + let Some((proposal_content_id, content)) = get_proposal_content( + proposal_id, + batcher.as_ref(), + proposal_sender, + cende_write_success, + &transaction_converter, + ) + .await else { return; }; @@ -586,7 +613,8 @@ async fn get_proposal_content( batcher: &dyn BatcherClient, mut proposal_sender: mpsc::Sender, cende_write_success: AbortOnDropHandle, -) -> Option<(ProposalContentId, Vec)> { + transaction_converter: &TransactionConverter, +) -> Option<(ProposalContentId, Vec)> { let mut content = Vec::new(); loop { // We currently want one part of the node failing to cause all components to fail. If this @@ -606,8 +634,15 @@ async fn get_proposal_content( "Sending transaction batch with {} txs.", txs.len() ); - let transactions = - txs.into_iter().map(|tx| tx.into()).collect::>(); + let transactions = futures::future::join_all(txs.into_iter().map(|tx| { + transaction_converter.convert_internal_consensus_tx_to_consensus_tx(tx) + })) + .await + .into_iter() + .collect::, _>>() + // TODO(shahak): Don't panic here. + .expect("Failed converting consensus transaction to external representation"); + debug!("Converted transactions to external representation."); trace!(?transactions, "Sending transaction batch with {} txs.", transactions.len()); proposal_sender .send(ProposalPart::Transactions(TransactionBatch { transactions })) @@ -651,7 +686,6 @@ async fn get_proposal_content( // TODO(Arni): Remove the clippy when switch to ProposalInit. #[allow(clippy::too_many_arguments)] async fn validate_proposal( - chain_id: ChainId, proposal_id: ProposalId, batcher: &dyn BatcherClient, height: BlockNumber, @@ -662,6 +696,7 @@ async fn validate_proposal( fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>, cancel_token: CancellationToken, gas_prices: GasPrices, + transaction_converter: TransactionConverter, ) { initiate_validation(batcher, proposal_id, height, proposer, timeout, gas_prices).await; @@ -685,7 +720,7 @@ async fn validate_proposal( batcher, proposal_part, &mut content, - chain_id.clone() + &transaction_converter, ).await { HandledProposalPart::Finished(built_block, received_fin) => { break (built_block, received_fin); @@ -760,27 +795,26 @@ async fn handle_proposal_part( proposal_id: ProposalId, batcher: &dyn BatcherClient, proposal_part: Option, - content: &mut Vec, - chain_id: ChainId, + content: &mut Vec, + transaction_converter: &TransactionConverter, ) -> HandledProposalPart { match proposal_part { None => HandledProposalPart::Failed("Failed to receive proposal content".to_string()), Some(ProposalPart::Transactions(TransactionBatch { transactions: txs })) => { debug!("Received transaction batch with {} txs", txs.len()); - let exe_txs: Vec = txs - .into_iter() - .map(|tx| { - // An error means we have an invalid chain_id. - (tx, &chain_id) - .try_into() - .expect("Failed to convert transaction to executable_transation.") - }) - .collect(); - content.extend_from_slice(&exe_txs[..]); - let input = SendProposalContentInput { - proposal_id, - content: SendProposalContent::Txs(exe_txs), - }; + let txs = futures::future::join_all(txs.into_iter().map(|tx| { + transaction_converter.convert_consensus_tx_to_internal_consensus_tx(tx) + })) + .await + .into_iter() + .collect::, _>>() + // TODO(shahak): Don't panic here. + .expect("Failed converting consensus transaction to internal representation"); + debug!("Converted transactions to internal representation."); + + content.extend_from_slice(&txs[..]); + let input = + SendProposalContentInput { proposal_id, content: SendProposalContent::Txs(txs) }; let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| { panic!("Failed to send proposal content to batcher: {proposal_id:?}. {e:?}") }); diff --git a/crates/starknet_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/starknet_consensus_orchestrator/src/sequencer_consensus_context_test.rs index a101a61606d..f1dc70604bf 100644 --- a/crates/starknet_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/starknet_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -4,6 +4,7 @@ use std::time::Duration; use std::vec; use futures::channel::{mpsc, oneshot}; +use futures::executor::block_on; use futures::future::pending; use futures::{FutureExt, SinkExt}; use lazy_static::lazy_static; @@ -23,12 +24,11 @@ use papyrus_protobuf::consensus::{ Vote, }; use starknet_api::block::{BlockHash, BlockNumber}; +use starknet_api::consensus_transaction::{ConsensusTransaction, InternalConsensusTransaction}; use starknet_api::core::{ChainId, Nonce, StateDiffCommitment}; -use starknet_api::executable_transaction::Transaction as ExecutableTransaction; use starknet_api::felt; use starknet_api::hash::PoseidonHash; -use starknet_api::test_utils::invoke::{invoke_tx, InvokeTxArgs}; -use starknet_api::transaction::Transaction; +use starknet_api::test_utils::invoke::{rpc_invoke_tx, InvokeTxArgs}; use starknet_batcher_types::batcher_types::{ GetProposalContent, GetProposalContentResponse, @@ -42,6 +42,11 @@ use starknet_batcher_types::batcher_types::{ ValidateBlockInput, }; use starknet_batcher_types::communication::MockBatcherClient; +use starknet_class_manager_types::transaction_converter::{ + TransactionConverter, + TransactionConverterTrait, +}; +use starknet_class_manager_types::EmptyClassManagerClient; use starknet_consensus::stream_handler::StreamHandler; use starknet_consensus::types::{ConsensusContext, ContextConfig}; use starknet_state_sync_types::communication::MockStateSyncClient; @@ -57,13 +62,19 @@ const STATE_DIFF_COMMITMENT: StateDiffCommitment = StateDiffCommitment(PoseidonH const CHAIN_ID: ChainId = ChainId::Mainnet; lazy_static! { - static ref TX_BATCH: Vec = (0..3).map(generate_invoke_tx).collect(); - static ref EXECUTABLE_TX_BATCH: Vec = - TX_BATCH.iter().map(|tx| (tx.clone(), &CHAIN_ID).try_into().unwrap()).collect(); + static ref TX_BATCH: Vec = + (0..3).map(generate_invoke_tx).collect(); + // TODO(shahak): Use MockTransactionConverter instead. + static ref TRANSACTION_CONVERTER: TransactionConverter = + TransactionConverter::new(Arc::new(EmptyClassManagerClient), CHAIN_ID); + static ref INTERNAL_TX_BATCH: Vec = + TX_BATCH.iter().cloned().map(|tx| { + block_on(TRANSACTION_CONVERTER.convert_consensus_tx_to_internal_consensus_tx(tx)).unwrap() + }).collect(); } -fn generate_invoke_tx(nonce: u8) -> Transaction { - Transaction::Invoke(invoke_tx(InvokeTxArgs { +fn generate_invoke_tx(nonce: u8) -> ConsensusTransaction { + ConsensusTransaction::RpcTransaction(rpc_invoke_tx(InvokeTxArgs { nonce: Nonce(felt!(nonce)), ..Default::default() })) @@ -96,6 +107,8 @@ fn setup( let context = SequencerConsensusContext::new( ContextConfig { num_validators: NUM_VALIDATORS, chain_id: CHAIN_ID, ..Default::default() }, + // TODO(shahak): Use MockTransactionConverter instead. + Arc::new(EmptyClassManagerClient), Arc::new(state_sync_client), Arc::new(batcher), outbound_proposal_stream_sender, @@ -130,7 +143,7 @@ async fn build_proposal_setup( batcher.expect_get_proposal_content().times(1).returning(move |input| { assert_eq!(input.proposal_id, *proposal_id_clone.get().unwrap()); Ok(GetProposalContentResponse { - content: GetProposalContent::Txs(EXECUTABLE_TX_BATCH.clone()), + content: GetProposalContent::Txs(INTERNAL_TX_BATCH.clone()), }) }); let proposal_id_clone = Arc::clone(&proposal_id); @@ -176,7 +189,7 @@ async fn validate_proposal_success() { let SendProposalContent::Txs(txs) = input.content else { panic!("Expected SendProposalContent::Txs, got {:?}", input.content); }; - assert_eq!(txs, *EXECUTABLE_TX_BATCH); + assert_eq!(txs, *INTERNAL_TX_BATCH); Ok(SendProposalContentResponse { response: ProposalStatus::Processing }) }, ); @@ -287,7 +300,7 @@ async fn proposals_from_different_rounds() { let SendProposalContent::Txs(txs) = input.content else { panic!("Expected SendProposalContent::Txs, got {:?}", input.content); }; - assert_eq!(txs, *EXECUTABLE_TX_BATCH); + assert_eq!(txs, *INTERNAL_TX_BATCH); Ok(SendProposalContentResponse { response: ProposalStatus::Processing }) }, ); @@ -376,7 +389,7 @@ async fn interrupt_active_proposal() { .expect_send_proposal_content() .withf(|input| { input.proposal_id == ProposalId(1) - && input.content == SendProposalContent::Txs(EXECUTABLE_TX_BATCH.clone()) + && input.content == SendProposalContent::Txs(INTERNAL_TX_BATCH.clone()) }) .times(1) .returning(move |_| { diff --git a/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs b/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs index b519cbfcc31..8a876314344 100644 --- a/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs +++ b/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs @@ -13,8 +13,9 @@ use papyrus_storage::test_utils::CHAIN_ID_FOR_TESTS; use pretty_assertions::assert_eq; use rstest::{fixture, rstest}; use starknet_api::block::{BlockHash, BlockNumber}; +use starknet_api::consensus_transaction::ConsensusTransaction; use starknet_api::rpc_transaction::RpcTransaction; -use starknet_api::transaction::TransactionHash; +use starknet_api::transaction::{TransactionHash, TransactionHasher, TransactionVersion}; use starknet_consensus::types::ValidatorId; use starknet_infra_utils::test_utils::TestIdentifier; use starknet_integration_tests::flow_test_setup::{FlowSequencerSetup, FlowTestSetup}; @@ -205,12 +206,18 @@ async fn listen_to_broadcasted_messages( panic!("Unexpected init: {:?}", init) } StreamMessageBody::Content(ProposalPart::Transactions(transactions)) => { - received_tx_hashes.extend( - transactions - .transactions - .iter() - .map(|tx| tx.calculate_transaction_hash(&chain_id).unwrap()), - ); + // TODO(Arni): add calculate_transaction_hash to consensus transaction and use it + // here. + received_tx_hashes.extend(transactions.transactions.iter().map(|tx| match tx { + ConsensusTransaction::RpcTransaction(tx) => { + let starknet_api_tx = + starknet_api::transaction::Transaction::from(tx.clone()); + starknet_api_tx.calculate_transaction_hash(&chain_id).unwrap() + } + ConsensusTransaction::L1Handler(tx) => { + tx.calculate_transaction_hash(&chain_id, &TransactionVersion::ZERO).unwrap() + } + })); } StreamMessageBody::Content(ProposalPart::Fin(proposal_fin)) => { assert_eq!( diff --git a/crates/starknet_sequencer_node/src/components.rs b/crates/starknet_sequencer_node/src/components.rs index 0b59cf9b9d7..9ae98e12324 100644 --- a/crates/starknet_sequencer_node/src/components.rs +++ b/crates/starknet_sequencer_node/src/components.rs @@ -79,6 +79,8 @@ pub async fn create_node_components( config.consensus_manager_config.clone(), batcher_client, state_sync_client, + // TODO(shahak): use the correct client. + Arc::new(EmptyClassManagerClient), )) } ActiveComponentExecutionMode::Disabled => None,