Skip to content

Commit

Permalink
fix(mempool): minor mempool improvements (#3113)
Browse files Browse the repository at this point in the history
## What ❔

fixes two mempool issues:
1) `gc` does not really ensure `size <= capatity` (found by
@jcsec-security), fix: purge some accounts with lowest score from
priority queue
2) Mempool actor built l2 tx filter based on current l1 gas prices. This
doesn't make sense when there is an open batch, if there is some then we
should use its fee params.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zkstack dev fmt` and `zkstack dev
lint`.
  • Loading branch information
perekopskiy authored Oct 21, 2024
1 parent d6de4f4 commit cd16083
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 32 deletions.
55 changes: 45 additions & 10 deletions core/lib/mempool/src/mempool_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{hash_map, BTreeSet, HashMap, HashSet};
use std::collections::{hash_map, BTreeSet, HashMap};

use zksync_types::{
l1::L1Tx, l2::L2Tx, Address, ExecuteTransactionCommon, Nonce, PriorityOpId, Transaction,
Expand Down Expand Up @@ -221,22 +221,57 @@ impl MempoolStore {
}

fn gc(&mut self) -> Vec<Address> {
if self.size >= self.capacity {
let index: HashSet<_> = self
if self.size > self.capacity {
let mut transactions = std::mem::take(&mut self.l2_transactions_per_account);
let mut possibly_kept: Vec<_> = self
.l2_priority_queue
.iter()
.map(|pointer| pointer.account)
.rev()
.filter_map(|pointer| {
transactions
.remove(&pointer.account)
.map(|txs| (pointer.account, txs))
})
.collect();
let transactions = std::mem::take(&mut self.l2_transactions_per_account);
let (kept, drained) = transactions

let mut sum = 0;
let mut number_of_accounts_kept = 0;
for (_, txs) in &possibly_kept {
sum += txs.len();
if sum <= self.capacity as usize {
number_of_accounts_kept += 1;
} else {
break;
}
}
if number_of_accounts_kept == 0 && !possibly_kept.is_empty() {
tracing::warn!("mempool capacity is too low to handle txs from single account, consider increasing capacity");
// Keep at least one entry, otherwise mempool won't return any new L2 tx to process.
number_of_accounts_kept = 1;
}
let (kept, drained) = {
let mut drained: Vec<_> = transactions.into_keys().collect();
let also_drained = possibly_kept
.split_off(number_of_accounts_kept)
.into_iter()
.map(|(address, _)| address);
drained.extend(also_drained);

(possibly_kept, drained)
};

let l2_priority_queue = std::mem::take(&mut self.l2_priority_queue);
self.l2_priority_queue = l2_priority_queue
.into_iter()
.partition(|(address, _)| index.contains(address));
self.l2_transactions_per_account = kept;
.rev()
.take(number_of_accounts_kept)
.collect();
self.l2_transactions_per_account = kept.into_iter().collect();
self.size = self
.l2_transactions_per_account
.iter()
.fold(0, |agg, (_, tnxs)| agg + tnxs.len() as u64);
return drained.into_keys().collect();
.fold(0, |agg, (_, txs)| agg + txs.len() as u64);
return drained;
}
vec![]
}
Expand Down
50 changes: 36 additions & 14 deletions core/lib/mempool/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,32 +321,26 @@ fn stashed_accounts() {

#[test]
fn mempool_capacity() {
let mut mempool = MempoolStore::new(PriorityOpId(0), 5);
let mut mempool = MempoolStore::new(PriorityOpId(0), 4);
let account0 = Address::random();
let account1 = Address::random();
let account2 = Address::random();
let account3 = Address::random();
let transactions = vec![
gen_l2_tx(account0, Nonce(0)),
gen_l2_tx(account0, Nonce(1)),
gen_l2_tx(account0, Nonce(2)),
gen_l2_tx(account1, Nonce(1)),
gen_l2_tx(account2, Nonce(1)),
gen_l2_tx_with_timestamp(account1, Nonce(0), unix_timestamp_ms() + 1),
gen_l2_tx_with_timestamp(account2, Nonce(0), unix_timestamp_ms() + 2),
gen_l2_tx(account3, Nonce(1)),
];
mempool.insert(transactions, HashMap::new());
// the mempool is full. Accounts with non-sequential nonces got stashed
// Mempool is full. Accounts with non-sequential nonces and some accounts with lowest score should be purged.
assert_eq!(
HashSet::<_>::from_iter(mempool.get_mempool_info().purged_accounts),
HashSet::<_>::from_iter(vec![account1, account2]),
);
// verify that existing good-to-go transactions and new ones got picked
mempool.insert(
vec![gen_l2_tx_with_timestamp(
account1,
Nonce(0),
unix_timestamp_ms() + 1,
)],
HashMap::new(),
HashSet::from([account2, account3]),
);
// verify that good-to-go transactions are kept.
for _ in 0..3 {
assert_eq!(
mempool
Expand All @@ -363,6 +357,34 @@ fn mempool_capacity() {
.initiator_account(),
account1
);
assert!(!mempool.has_next(&L2TxFilter::default()));
}

#[test]
fn mempool_does_not_purge_all_accounts() {
let mut mempool = MempoolStore::new(PriorityOpId(0), 1);
let account0 = Address::random();
let account1 = Address::random();
let transactions = vec![
gen_l2_tx(account0, Nonce(0)),
gen_l2_tx(account0, Nonce(1)),
gen_l2_tx(account1, Nonce(1)),
];
mempool.insert(transactions, HashMap::new());
// Mempool is full. Account 1 has tx with non-sequential nonce so it should be purged.
// Txs from account 0 have sequential nonces but their number is greater than capacity; they should be kept.
assert_eq!(mempool.get_mempool_info().purged_accounts, vec![account1]);
// verify that good-to-go transactions are kept.
for _ in 0..2 {
assert_eq!(
mempool
.next_transaction(&L2TxFilter::default())
.unwrap()
.initiator_account(),
account0
);
}
assert!(!mempool.has_next(&L2TxFilter::default()));
}

fn gen_l2_tx(address: Address, nonce: Nonce) -> Transaction {
Expand Down
31 changes: 23 additions & 8 deletions core/node/state_keeper/src/mempool_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,35 @@ impl MempoolFetcher {
.await
.context("failed getting pending protocol version")?;

let l2_tx_filter = l2_tx_filter(
self.batch_fee_input_provider.as_ref(),
protocol_version.into(),
)
.await
.context("failed creating L2 transaction filter")?;
let (fee_per_gas, gas_per_pubdata) = if let Some(unsealed_batch) = storage
.blocks_dal()
.get_unsealed_l1_batch()
.await
.context("failed getting unsealed batch")?
{
let (fee_per_gas, gas_per_pubdata) = derive_base_fee_and_gas_per_pubdata(
unsealed_batch.fee_input,
protocol_version.into(),
);
(fee_per_gas, gas_per_pubdata as u32)
} else {
let filter = l2_tx_filter(
self.batch_fee_input_provider.as_ref(),
protocol_version.into(),
)
.await
.context("failed creating L2 transaction filter")?;

(filter.fee_per_gas, filter.gas_per_pubdata)
};

let transactions = storage
.transactions_dal()
.sync_mempool(
&mempool_info.stashed_accounts,
&mempool_info.purged_accounts,
l2_tx_filter.gas_per_pubdata,
l2_tx_filter.fee_per_gas,
gas_per_pubdata,
fee_per_gas,
self.sync_batch_size,
)
.await
Expand Down

0 comments on commit cd16083

Please sign in to comment.