Skip to content

Commit

Permalink
feat(starknet_mempool): add remove old transaction API to Transaction…
Browse files Browse the repository at this point in the history
…Pool
  • Loading branch information
dafnamatsry committed Feb 9, 2025
1 parent 149efdf commit 8560ae0
Showing 1 changed file with 149 additions and 18 deletions.
167 changes: 149 additions & 18 deletions crates/starknet_mempool/src/transaction_pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::cmp::Ordering;
use std::collections::{hash_map, BTreeMap, HashMap};
use std::time::{Duration, Instant};

use starknet_api::core::{ContractAddress, Nonce};
use starknet_api::rpc_transaction::InternalRpcTransaction;
Expand All @@ -21,6 +23,9 @@ pub struct TransactionPool {
tx_pool: HashToTransaction,
// Transactions organized by account address, sorted by ascending nonce values.
txs_by_account: AccountTransactionIndex,
// Transactions sorted by their time spent in the pool, i.e., in descending order of submission
// time.
txs_by_submission_time: TimedTransactionMap,
// Tracks the capacity of the pool.
capacity: PoolCapacity,
}
Expand All @@ -47,6 +52,16 @@ impl TransactionPool {
)
};

// Insert to timed mapping.
let unexpected_existing_tx = self.txs_by_submission_time.insert(tx_reference);
if unexpected_existing_tx.is_some() {
panic!(
"Transaction pool consistency error: transaction with hash {tx_hash} does not
appear in main mapping, but transaction with same hash appears in the timed
mapping",
)
};

self.capacity.add();

Ok(())
Expand All @@ -57,32 +72,24 @@ impl TransactionPool {
let tx =
self.tx_pool.remove(&tx_hash).ok_or(MempoolError::TransactionNotFound { tx_hash })?;

// Remove from account mapping.
self.txs_by_account.remove(TransactionReference::new(&tx)).unwrap_or_else(|| {
panic!(
"Transaction pool consistency error: transaction with hash {tx_hash} appears in
main mapping, but does not appear in the account mapping"
)
});

self.capacity.remove();
// Remove reference from other mappings.
let removed_tx = vec![TransactionReference::new(&tx)];
self.align_other_mappings_with(&removed_tx, PoolMappings::Main);

Ok(tx)
}

pub fn remove_up_to_nonce(&mut self, address: ContractAddress, nonce: Nonce) {
let removed_txs = self.txs_by_account.remove_up_to_nonce(address, nonce);
self.align_other_mappings_with(&removed_txs, PoolMappings::AccountTransactionIndex);
}

for TransactionReference { tx_hash, .. } in removed_txs {
self.tx_pool.remove(&tx_hash).unwrap_or_else(|| {
panic!(
"Transaction pool consistency error: transaction with hash {tx_hash} appears
in account mapping, but does not appear in the main mapping"
);
});
#[allow(dead_code)]
pub fn remove_txs_older_than(&mut self, duration: Duration) -> Vec<TransactionReference> {
let removed_txs = self.txs_by_submission_time.remove_txs_older_than(duration);
self.align_other_mappings_with(&removed_txs, PoolMappings::TimedTransactionMap);

self.capacity.remove();
}
removed_txs
}

pub fn account_txs_sorted_by_nonce(
Expand Down Expand Up @@ -120,6 +127,47 @@ impl TransactionPool {
self.txs_by_account._contains(address)
}

// Rmoves the given `removed_txs` from all the pool mappings, except for the one specified in
// `skip_mapping`.
fn align_other_mappings_with(
&mut self,
removed_txs: &Vec<TransactionReference>,
skip_mapping: PoolMappings,
) {
for tx in removed_txs {
let tx_hash = tx.tx_hash;

if skip_mapping != PoolMappings::Main {
self.tx_pool.remove(&tx_hash).unwrap_or_else(|| {
panic!(
"Transaction pool consistency error: transaction with hash {tx_hash} does \
not appear in the main mapping.",
)
});
}

if skip_mapping != PoolMappings::AccountTransactionIndex {
self.txs_by_account.remove(*tx).unwrap_or_else(|| {
panic!(
"Transaction pool consistency error: transaction with hash {tx_hash} does \
not appear in the account index mapping.",
)
});
}

if skip_mapping != PoolMappings::TimedTransactionMap {
self.txs_by_submission_time.remove(tx_hash).unwrap_or_else(|| {
panic!(
"Transaction pool consistency error: transaction with hash {tx_hash} does \
not appear in the timed mapping.",
)
});
}

self.capacity.remove();
}
}

#[cfg(test)]
pub fn content(&self) -> TransactionPoolContent {
TransactionPoolContent { tx_pool: self.tx_pool.clone() }
Expand Down Expand Up @@ -207,3 +255,86 @@ impl PoolCapacity {
self.n_txs.checked_sub(1).expect("Underflow: Cannot subtract from an empty pool.");
}
}

#[derive(Eq, PartialEq)]
enum PoolMappings {
Main,
AccountTransactionIndex,
TimedTransactionMap,
}

/// Uniquly identify a transaction submission.
#[derive(Clone, Debug)]
struct SubmissionID {
submission_time: Instant,
tx_hash: TransactionHash,
}

impl PartialEq for SubmissionID {
fn eq(&self, other: &Self) -> bool {
self.submission_time == other.submission_time && self.tx_hash == other.tx_hash
}
}

impl Eq for SubmissionID {}

// Implementing the `Ord` trait based on the transaction's duration in the pool. I.e. a transaction
// that has been in the pool for a longer time will be considered "greater" than a transaction that
// has been in the pool for a shorter time.
impl Ord for SubmissionID {
fn cmp(&self, other: &Self) -> Ordering {
self.submission_time
.cmp(&other.submission_time)
.reverse()
.then_with(|| self.tx_hash.cmp(&other.tx_hash))
}
}

impl PartialOrd for SubmissionID {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

#[derive(Debug, Default, Eq, PartialEq)]
struct TimedTransactionMap {
txs_by_submission_time: BTreeMap<SubmissionID, TransactionReference>,
hash_to_submission_id: HashMap<TransactionHash, SubmissionID>,
}

impl TimedTransactionMap {
/// If the transaction with the same transaction hash already exists in the mapping, the old
/// submission ID is returned.
fn insert(&mut self, tx: TransactionReference) -> Option<SubmissionID> {
// TODO(dafna, 1/3/2025): Use a Clock trait instead of Instant.
let submission_id = SubmissionID { submission_time: Instant::now(), tx_hash: tx.tx_hash };
self.txs_by_submission_time.insert(submission_id.clone(), tx);
self.hash_to_submission_id.insert(tx.tx_hash, submission_id)
}

/// Removes the transaction with the given transaction hash from the mapping.
/// Returns the removed transaction reference if it exists in the mapping.
fn remove(&mut self, tx_hash: TransactionHash) -> Option<TransactionReference> {
let submission_id = self.hash_to_submission_id.remove(&tx_hash)?;
self.txs_by_submission_time.remove(&submission_id)
}

/// Removes all transactions that were submitted to the pool before the given duration.
#[allow(dead_code)]
pub fn remove_txs_older_than(&mut self, duration: Duration) -> Vec<TransactionReference> {
let split_off_value = SubmissionID {
submission_time: Instant::now() - duration,
tx_hash: Default::default(),
};
let removed_txs: Vec<_> =
self.txs_by_submission_time.split_off(&split_off_value).into_values().collect();

for tx in removed_txs.iter() {
self.hash_to_submission_id.remove(&tx.tx_hash).expect(
"Transaction should have a submission ID if it is in the timed transaction map.",
);
}

removed_txs
}
}

0 comments on commit 8560ae0

Please sign in to comment.