Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(starknet_mempool): add remove old transaction API to TransactionPool #4029

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 140 additions & 16 deletions crates/starknet_mempool/src/transaction_pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::cmp::Ordering;
use std::collections::{hash_map, BTreeMap, HashMap};
use std::time::{Duration, Instant};

use starknet_api::core::{ContractAddress, Nonce};
use starknet_api::rpc_transaction::InternalRpcTransaction;
Expand All @@ -21,6 +23,8 @@ pub struct TransactionPool {
tx_pool: HashToTransaction,
// Transactions organized by account address, sorted by ascending nonce values.
txs_by_account: AccountTransactionIndex,
// Transactions sorted by their time spent in the pool (i.e. newest to oldest).
txs_by_submission_time: TimedTransactionMap,
// Tracks the capacity of the pool.
capacity: PoolCapacity,
}
Expand All @@ -47,6 +51,16 @@ impl TransactionPool {
)
};

// Insert to timed mapping.
let unexpected_existing_tx = self.txs_by_submission_time.insert(tx_reference);
if unexpected_existing_tx.is_some() {
panic!(
"Transaction pool consistency error: transaction with hash {tx_hash} does not
appear in main mapping, but transaction with same hash appears in the timed
mapping",
)
};

self.capacity.add();

Ok(())
Expand All @@ -57,13 +71,10 @@ impl TransactionPool {
let tx =
self.tx_pool.remove(&tx_hash).ok_or(MempoolError::TransactionNotFound { tx_hash })?;

// Remove from account mapping.
self.txs_by_account.remove(TransactionReference::new(&tx)).unwrap_or_else(|| {
panic!(
"Transaction pool consistency error: transaction with hash {tx_hash} appears in
main mapping, but does not appear in the account mapping"
)
});
// Remove reference from other mappings.
let removed_tx = vec![TransactionReference::new(&tx)];
self.remove_from_account_mapping(&removed_tx);
self.remove_from_timed_mapping(&removed_tx);

self.capacity.remove();

Expand All @@ -73,16 +84,22 @@ impl TransactionPool {
pub fn remove_up_to_nonce(&mut self, address: ContractAddress, nonce: Nonce) {
let removed_txs = self.txs_by_account.remove_up_to_nonce(address, nonce);

for TransactionReference { tx_hash, .. } in removed_txs {
self.tx_pool.remove(&tx_hash).unwrap_or_else(|| {
panic!(
"Transaction pool consistency error: transaction with hash {tx_hash} appears
in account mapping, but does not appear in the main mapping"
);
});
self.remove_from_main_mapping(&removed_txs);
self.remove_from_timed_mapping(&removed_txs);

self.capacity.remove();
}
self.capacity.remove_n(removed_txs.len());
}

#[allow(dead_code)]
pub fn remove_txs_older_than(&mut self, duration: Duration) -> Vec<TransactionReference> {
let removed_txs = self.txs_by_submission_time.remove_txs_older_than(duration);

self.remove_from_main_mapping(&removed_txs);
self.remove_from_account_mapping(&removed_txs);

self.capacity.remove_n(removed_txs.len());

removed_txs
}

pub fn account_txs_sorted_by_nonce(
Expand Down Expand Up @@ -120,6 +137,40 @@ impl TransactionPool {
self.txs_by_account._contains(address)
}

fn remove_from_main_mapping(&mut self, removed_txs: &Vec<TransactionReference>) {
for TransactionReference { tx_hash, .. } in removed_txs {
self.tx_pool.remove(tx_hash).unwrap_or_else(|| {
panic!(
"Transaction pool consistency error: transaction with hash {tx_hash} does not \
appear in the main mapping.",
)
});
}
}

fn remove_from_account_mapping(&mut self, removed_txs: &Vec<TransactionReference>) {
for tx in removed_txs {
let tx_hash = tx.tx_hash;
self.txs_by_account.remove(*tx).unwrap_or_else(|| {
panic!(
"Transaction pool consistency error: transaction with hash {tx_hash} does not \
appear in the account index mapping.",
)
});
}
}

fn remove_from_timed_mapping(&mut self, removed_txs: &Vec<TransactionReference>) {
for TransactionReference { tx_hash, .. } in removed_txs {
self.txs_by_submission_time.remove(*tx_hash).unwrap_or_else(|| {
panic!(
"Transaction pool consistency error: transaction with hash {tx_hash} does not \
appear in the timed mapping.",
)
});
}
}

#[cfg(test)]
pub fn tx_pool(&self) -> HashMap<TransactionHash, InternalRpcTransaction> {
self.tx_pool.clone()
Expand Down Expand Up @@ -200,4 +251,77 @@ impl PoolCapacity {
self.n_txs =
self.n_txs.checked_sub(1).expect("Underflow: Cannot subtract from an empty pool.");
}

fn remove_n(&mut self, n: usize) {
self.n_txs =
self.n_txs.checked_sub(n).expect("Underflow: Cannot subtract from an empty pool.");
}
}

/// Uniquely identify a transaction submission.
#[derive(Clone, Debug, PartialEq, Eq)]
struct SubmissionID {
submission_time: Instant,
tx_hash: TransactionHash,
}

// Implementing the `Ord` trait based on the transaction's duration in the pool. I.e. a transaction
// that has been in the pool for a longer time will be considered "greater" than a transaction that
// has been in the pool for a shorter time.
impl Ord for SubmissionID {
fn cmp(&self, other: &Self) -> Ordering {
self.submission_time
.cmp(&other.submission_time)
.reverse()
.then_with(|| self.tx_hash.cmp(&other.tx_hash))
}
}

impl PartialOrd for SubmissionID {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

#[derive(Debug, Default, Eq, PartialEq)]
struct TimedTransactionMap {
txs_by_submission_time: BTreeMap<SubmissionID, TransactionReference>,
hash_to_submission_id: HashMap<TransactionHash, SubmissionID>,
}

impl TimedTransactionMap {
/// If a transaction with the same transaction hash already exists in the mapping, the previous
/// submission ID is returned.
fn insert(&mut self, tx: TransactionReference) -> Option<SubmissionID> {
// TODO(dafna, 1/3/2025): Use a Clock trait instead of Instant.
let submission_id = SubmissionID { submission_time: Instant::now(), tx_hash: tx.tx_hash };
self.txs_by_submission_time.insert(submission_id.clone(), tx);
self.hash_to_submission_id.insert(tx.tx_hash, submission_id)
}

/// Removes the transaction with the given transaction hash from the mapping.
/// Returns the removed transaction reference if it exists in the mapping.
fn remove(&mut self, tx_hash: TransactionHash) -> Option<TransactionReference> {
let submission_id = self.hash_to_submission_id.remove(&tx_hash)?;
self.txs_by_submission_time.remove(&submission_id)
}

/// Removes all transactions that were submitted to the pool before the given duration.
#[allow(dead_code)]
pub fn remove_txs_older_than(&mut self, duration: Duration) -> Vec<TransactionReference> {
let split_off_value = SubmissionID {
submission_time: Instant::now() - duration,
tx_hash: Default::default(),
};
let removed_txs: Vec<_> =
self.txs_by_submission_time.split_off(&split_off_value).into_values().collect();

for tx in removed_txs.iter() {
self.hash_to_submission_id.remove(&tx.tx_hash).expect(
"Transaction should have a submission ID if it is in the timed transaction map.",
);
}

removed_txs
}
}
Loading