Skip to content

Commit

Permalink
feat(starknet_consensus_orchestrator): use InternalConsensusTransacti…
Browse files Browse the repository at this point in the history
…on in consensus
  • Loading branch information
ShahakShama committed Jan 30, 2025
1 parent dc59e43 commit 9a76b23
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 54 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/starknet_consensus_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ papyrus_protobuf.workspace = true
serde.workspace = true
starknet_api.workspace = true
starknet_batcher_types.workspace = true
starknet_class_manager_types.workspace = true
starknet_consensus.workspace = true
starknet_consensus_orchestrator.workspace = true
starknet_infra_utils.workspace = true
Expand Down
9 changes: 7 additions & 2 deletions crates/starknet_consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use papyrus_protobuf::consensus::{HeightAndRound, ProposalPart, StreamMessage, V
use starknet_api::block::BlockNumber;
use starknet_batcher_types::batcher_types::RevertBlockInput;
use starknet_batcher_types::communication::SharedBatcherClient;
use starknet_class_manager_types::SharedClassManagerClient;
use starknet_consensus::stream_handler::StreamHandler;
use starknet_consensus::types::ConsensusError;
use starknet_consensus_orchestrator::cende::CendeAmbassador;
Expand All @@ -33,15 +34,17 @@ pub struct ConsensusManager {
pub config: ConsensusManagerConfig,
pub batcher_client: SharedBatcherClient,
pub state_sync_client: SharedStateSyncClient,
pub class_manager_client: SharedClassManagerClient,
}

impl ConsensusManager {
pub fn new(
config: ConsensusManagerConfig,
batcher_client: SharedBatcherClient,
state_sync_client: SharedStateSyncClient,
class_manager_client: SharedClassManagerClient,
) -> Self {
Self { config, batcher_client, state_sync_client }
Self { config, batcher_client, state_sync_client, class_manager_client }
}

pub async fn run(&self) -> Result<(), ConsensusError> {
Expand Down Expand Up @@ -94,6 +97,7 @@ impl ConsensusManager {

let context = SequencerConsensusContext::new(
self.config.context_config.clone(),
Arc::clone(&self.class_manager_client),
Arc::clone(&self.state_sync_client),
Arc::clone(&self.batcher_client),
outbound_internal_sender,
Expand Down Expand Up @@ -179,8 +183,9 @@ pub fn create_consensus_manager(
config: ConsensusManagerConfig,
batcher_client: SharedBatcherClient,
state_sync_client: SharedStateSyncClient,
class_manager_client: SharedClassManagerClient,
) -> ConsensusManager {
ConsensusManager::new(config, batcher_client, state_sync_client)
ConsensusManager::new(config, batcher_client, state_sync_client, class_manager_client)
}

#[async_trait]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use rstest::rstest;
use starknet_api::block::BlockNumber;
use starknet_batcher_types::batcher_types::{GetHeightResponse, RevertBlockInput};
use starknet_batcher_types::communication::MockBatcherClient;
use starknet_class_manager_types::EmptyClassManagerClient;
use starknet_state_sync_types::communication::MockStateSyncClient;
use tokio::time::{timeout, Duration};

Expand Down Expand Up @@ -41,6 +42,7 @@ async fn revert_batcher_blocks() {
manager_config,
Arc::new(mock_batcher_client),
Arc::new(MockStateSyncClient::new()),
Arc::new(EmptyClassManagerClient),
);

// TODO(Shahak, dvir): try to solve this better (the test will take 100 milliseconds to run).
Expand Down Expand Up @@ -68,6 +70,7 @@ async fn revert_with_invalid_height_panics(#[case] revert_up_to_and_including: B
consensus_manager_config,
Arc::new(mock_batcher),
Arc::new(MockStateSyncClient::new()),
Arc::new(EmptyClassManagerClient),
);

consensus_manager.run().await.unwrap();
Expand All @@ -86,6 +89,7 @@ async fn no_reverts_without_config() {
manager_config,
Arc::new(mock_batcher),
Arc::new(MockStateSyncClient::new()),
Arc::new(EmptyClassManagerClient),
);

// TODO(Shahak, dvir): try to solve this better (the test will take 100 milliseconds to run).
Expand Down
1 change: 1 addition & 0 deletions crates/starknet_consensus_orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ serde_json = { workspace = true, features = ["arbitrary_precision"] }
starknet-types-core.workspace = true
starknet_api.workspace = true
starknet_batcher_types = { workspace = true, features = ["testing"] }
starknet_class_manager_types.workspace = true
starknet_consensus.workspace = true
starknet_infra_utils.workspace = true
starknet_state_sync_types = { workspace = true, features = ["testing"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ use starknet_api::block::{
GasPrices,
NonzeroGasPrice,
};
use starknet_api::core::{ChainId, ContractAddress, SequencerContractAddress};
use starknet_api::executable_transaction::Transaction as ExecutableTransaction;
use starknet_api::transaction::{Transaction, TransactionHash};
use starknet_api::consensus_transaction::InternalConsensusTransaction;
use starknet_api::core::{ContractAddress, SequencerContractAddress};
use starknet_api::transaction::TransactionHash;
use starknet_batcher_types::batcher_types::{
DecisionReachedInput,
DecisionReachedResponse,
Expand All @@ -52,6 +52,11 @@ use starknet_batcher_types::batcher_types::{
ValidateBlockInput,
};
use starknet_batcher_types::communication::BatcherClient;
use starknet_class_manager_types::transaction_converter::{
TransactionConverter,
TransactionConverterTrait,
};
use starknet_class_manager_types::SharedClassManagerClient;
use starknet_consensus::types::{
ConsensusContext,
ConsensusError,
Expand Down Expand Up @@ -88,8 +93,10 @@ const TEMPORARY_GAS_PRICES: GasPrices = GasPrices {
// {height: {proposal_id: (content, [proposal_ids])}}
// Note that multiple proposals IDs can be associated with the same content, but we only need to
// store one of them.
type HeightToIdToContent =
BTreeMap<BlockNumber, HashMap<ProposalContentId, (Vec<ExecutableTransaction>, ProposalId)>>;
type HeightToIdToContent = BTreeMap<
BlockNumber,
HashMap<ProposalContentId, (Vec<InternalConsensusTransaction>, ProposalId)>,
>;
type ValidationParams = (BlockNumber, ValidatorId, Duration, mpsc::Receiver<ProposalPart>);

const CHANNEL_SIZE: usize = 100;
Expand All @@ -112,7 +119,8 @@ const BUILD_PROPOSAL_MARGIN: Duration = Duration::from_millis(1000);
const VALIDATE_PROPOSAL_MARGIN: Duration = Duration::from_secs(10);

pub struct SequencerConsensusContext {
config: ContextConfig,
// TODO(Shahak): change this into a dynamic TransactionConverterTrait.
transaction_converter: TransactionConverter,
state_sync_client: SharedStateSyncClient,
batcher: Arc<dyn BatcherClient>,
validators: Vec<ValidatorId>,
Expand Down Expand Up @@ -146,6 +154,7 @@ pub struct SequencerConsensusContext {
impl SequencerConsensusContext {
pub fn new(
config: ContextConfig,
class_manager_client: SharedClassManagerClient,
state_sync_client: SharedStateSyncClient,
batcher: Arc<dyn BatcherClient>,
outbound_proposal_sender: mpsc::Sender<(HeightAndRound, mpsc::Receiver<ProposalPart>)>,
Expand All @@ -154,7 +163,10 @@ impl SequencerConsensusContext {
) -> Self {
let num_validators = config.num_validators;
Self {
config,
transaction_converter: TransactionConverter::new(
class_manager_client,
config.chain_id.clone(),
),
state_sync_client,
batcher,
outbound_proposal_sender,
Expand Down Expand Up @@ -215,6 +227,7 @@ impl ConsensusContext for SequencerConsensusContext {
.await
.expect("Failed to send proposal receiver");
let gas_prices = self.gas_prices();
let transaction_converter = self.transaction_converter.clone();

info!(?proposal_init, ?timeout, %proposal_id, "Building proposal");
let handle = tokio::spawn(
Expand All @@ -229,6 +242,7 @@ impl ConsensusContext for SequencerConsensusContext {
proposal_id,
cende_write_success,
gas_prices,
transaction_converter,
)
.await;
}
Expand Down Expand Up @@ -376,6 +390,13 @@ impl ConsensusContext for SequencerConsensusContext {
// TODO(dvir): pass here real `BlobParameters` info.
// TODO(dvir): when passing here the correct `BlobParameters`, also test that
// `prepare_blob_for_next_height` is called with the correct parameters.
let transactions = futures::future::join_all(transactions.into_iter().map(|tx| {
self.transaction_converter.convert_internal_consensus_tx_to_executable_tx(tx)
}))
.await
.into_iter().collect::<Result<Vec<_>, _>>()
// TODO(shahak): Do not panic here.
.expect("Failed converting transactions for cende");
self.cende_ambassador
.prepare_blob_for_next_height(BlobParameters {
// TODO(dvir): use the real `BlockInfo` when consensus will save it.
Expand Down Expand Up @@ -463,8 +484,8 @@ impl SequencerConsensusContext {
let cancel_token = CancellationToken::new();
let cancel_token_clone = cancel_token.clone();
let batcher = Arc::clone(&self.batcher);
let transaction_converter = self.transaction_converter.clone();
let valid_proposals = Arc::clone(&self.valid_proposals);
let chain_id = self.config.chain_id.clone();
let proposal_id = ProposalId(self.proposal_id);
self.proposal_id += 1;
let gas_prices = self.gas_prices();
Expand All @@ -474,7 +495,6 @@ impl SequencerConsensusContext {
let handle = tokio::spawn(
async move {
validate_proposal(
chain_id,
proposal_id,
batcher.as_ref(),
height,
Expand All @@ -485,6 +505,7 @@ impl SequencerConsensusContext {
fin_sender,
cancel_token_clone,
gas_prices,
transaction_converter,
)
.await
}
Expand Down Expand Up @@ -515,16 +536,22 @@ async fn build_proposal(
proposal_id: ProposalId,
cende_write_success: AbortOnDropHandle<bool>,
gas_prices: GasPrices,
transaction_converter: TransactionConverter,
) {
initialize_build(proposal_id, &proposal_init, timeout, batcher.as_ref(), gas_prices).await;
proposal_sender
.send(ProposalPart::Init(proposal_init))
.await
.expect("Failed to send proposal init");

let Some((proposal_content_id, content)) =
get_proposal_content(proposal_id, batcher.as_ref(), proposal_sender, cende_write_success)
.await
let Some((proposal_content_id, content)) = get_proposal_content(
proposal_id,
batcher.as_ref(),
proposal_sender,
cende_write_success,
&transaction_converter,
)
.await
else {
return;
};
Expand Down Expand Up @@ -586,7 +613,8 @@ async fn get_proposal_content(
batcher: &dyn BatcherClient,
mut proposal_sender: mpsc::Sender<ProposalPart>,
cende_write_success: AbortOnDropHandle<bool>,
) -> Option<(ProposalContentId, Vec<ExecutableTransaction>)> {
transaction_converter: &TransactionConverter,
) -> Option<(ProposalContentId, Vec<InternalConsensusTransaction>)> {
let mut content = Vec::new();
loop {
// We currently want one part of the node failing to cause all components to fail. If this
Expand All @@ -606,8 +634,15 @@ async fn get_proposal_content(
"Sending transaction batch with {} txs.",
txs.len()
);
let transactions =
txs.into_iter().map(|tx| tx.into()).collect::<Vec<Transaction>>();
let transactions = futures::future::join_all(txs.into_iter().map(|tx| {
transaction_converter.convert_internal_consensus_tx_to_consensus_tx(tx)
}))
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
// TODO(shahak): Don't panic here.
.expect("Failed converting consensus transaction to external representation");
debug!("Converted transactions to external representation.");
trace!(?transactions, "Sending transaction batch with {} txs.", transactions.len());
proposal_sender
.send(ProposalPart::Transactions(TransactionBatch { transactions }))
Expand Down Expand Up @@ -651,7 +686,6 @@ async fn get_proposal_content(
// TODO(Arni): Remove the clippy when switch to ProposalInit.
#[allow(clippy::too_many_arguments)]
async fn validate_proposal(
chain_id: ChainId,
proposal_id: ProposalId,
batcher: &dyn BatcherClient,
height: BlockNumber,
Expand All @@ -662,6 +696,7 @@ async fn validate_proposal(
fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>,
cancel_token: CancellationToken,
gas_prices: GasPrices,
transaction_converter: TransactionConverter,
) {
initiate_validation(batcher, proposal_id, height, proposer, timeout, gas_prices).await;

Expand All @@ -685,7 +720,7 @@ async fn validate_proposal(
batcher,
proposal_part,
&mut content,
chain_id.clone()
&transaction_converter,
).await {
HandledProposalPart::Finished(built_block, received_fin) => {
break (built_block, received_fin);
Expand Down Expand Up @@ -760,27 +795,26 @@ async fn handle_proposal_part(
proposal_id: ProposalId,
batcher: &dyn BatcherClient,
proposal_part: Option<ProposalPart>,
content: &mut Vec<ExecutableTransaction>,
chain_id: ChainId,
content: &mut Vec<InternalConsensusTransaction>,
transaction_converter: &TransactionConverter,
) -> HandledProposalPart {
match proposal_part {
None => HandledProposalPart::Failed("Failed to receive proposal content".to_string()),
Some(ProposalPart::Transactions(TransactionBatch { transactions: txs })) => {
debug!("Received transaction batch with {} txs", txs.len());
let exe_txs: Vec<ExecutableTransaction> = txs
.into_iter()
.map(|tx| {
// An error means we have an invalid chain_id.
(tx, &chain_id)
.try_into()
.expect("Failed to convert transaction to executable_transation.")
})
.collect();
content.extend_from_slice(&exe_txs[..]);
let input = SendProposalContentInput {
proposal_id,
content: SendProposalContent::Txs(exe_txs),
};
let txs = futures::future::join_all(txs.into_iter().map(|tx| {
transaction_converter.convert_consensus_tx_to_internal_consensus_tx(tx)
}))
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
// TODO(shahak): Don't panic here.
.expect("Failed converting consensus transaction to internal representation");
debug!("Converted transactions to internal representation.");

content.extend_from_slice(&txs[..]);
let input =
SendProposalContentInput { proposal_id, content: SendProposalContent::Txs(txs) };
let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| {
panic!("Failed to send proposal content to batcher: {proposal_id:?}. {e:?}")
});
Expand Down
Loading

0 comments on commit 9a76b23

Please sign in to comment.