Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Add metric #256

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
32 changes: 20 additions & 12 deletions aptos-core/consensus/src/quorum_store/batch_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl BatchGenerator {
) -> Vec<Batch> {
// Sort by gas, in descending order. This is a stable sort on existing mempool ordering,
// so will not reorder accounts or their sequence numbers as long as they have the same gas.
pulled_txns.sort_by_key(|txn| u64::MAX - txn.gas_unit_price());
// pulled_txns.sort_by_key(|txn| u64::MAX - txn.gas_unit_price());

let reverse_buckets_excluding_zero: Vec<_> = self
.config
Expand All @@ -262,13 +262,8 @@ impl BatchGenerator {
}

// Search for key in descending gas order
let num_txns_in_bucket = match pulled_txns
.binary_search_by_key(&(u64::MAX - (*bucket_start - 1), PeerId::ZERO), |txn| {
(u64::MAX - txn.gas_unit_price(), txn.sender())
}) {
Ok(index) => index,
Err(index) => index,
};
let num_txns_in_bucket = pulled_txns
.len();
if num_txns_in_bucket == 0 {
continue;
}
Expand Down Expand Up @@ -328,12 +323,16 @@ impl BatchGenerator {
}

pub(crate) async fn handle_scheduled_pull(&mut self, max_count: u64) -> Vec<Batch> {
let start = Instant::now();
counters::BATCH_PULL_EXCLUDED_TXNS.observe(self.txns_in_progress_sorted.len() as f64);
trace!(
"QS: excluding txs len: {:?}",
self.txns_in_progress_sorted.len()
);

counters::MAX_BATCH_COUNT.observe(max_count as f64);
counters::MAX_BATCH_BYTES.observe(self.config.sender_max_total_bytes as f64);

let mut pulled_txns = self
.mempool_proxy
.pull_internal(
Expand All @@ -343,7 +342,7 @@ impl BatchGenerator {
)
.await
.unwrap_or_default();

println!("mempool_proxy.pull_internal take {} ms", start.elapsed().as_millis());
trace!("QS: pulled_txns len: {:?}", pulled_txns.len());

if pulled_txns.is_empty() {
Expand All @@ -370,7 +369,7 @@ impl BatchGenerator {
let batches = self.bucket_into_batches(&mut pulled_txns, expiry_time);
self.last_end_batch_time = Instant::now();
counters::BATCH_CREATION_COMPUTE_LATENCY.observe_duration(bucket_compute_start.elapsed());

println!("handle_scheduled_pull take {} ms", start.elapsed().as_millis());
batches
}

Expand Down Expand Up @@ -404,7 +403,8 @@ impl BatchGenerator {
let mut dynamic_pull_txn_per_s = (self.config.back_pressure.dynamic_min_txn_per_s
+ self.config.back_pressure.dynamic_max_txn_per_s)
/ 2;

let mut last_call = Instant::now();
let mut current_call = Instant::now();
loop {
let _timer = counters::BATCH_GENERATOR_MAIN_LOOP.start_timer();

Expand All @@ -413,7 +413,7 @@ impl BatchGenerator {
self.back_pressure = updated_back_pressure;
},
_ = interval.tick() => monitor!("batch_generator_handle_tick", {

current_call = Instant::now();
let tick_start = Instant::now();
// TODO: refactor back_pressure logic into its own function
if self.back_pressure.txn_count {
Expand Down Expand Up @@ -466,6 +466,7 @@ impl BatchGenerator {
);
let batches = self.handle_scheduled_pull(pull_max_txn).await;
if !batches.is_empty() {
let start = Instant::now();
last_non_empty_pull = tick_start;

let persist_start = Instant::now();
Expand All @@ -477,6 +478,9 @@ impl BatchGenerator {
counters::BATCH_CREATION_PERSIST_LATENCY.observe_duration(persist_start.elapsed());

network_sender.broadcast_batch_msg(batches).await;
println!("broadcasted batch msg take {} ms", start.elapsed().as_millis());
println!("handle_tick take {} ms", tick_start.elapsed().as_millis());
println!("handle_tick take interval {} ms", current_call.duration_since(last_call).as_millis());
} else if tick_start.elapsed() > interval.period().checked_div(2).unwrap_or(Duration::ZERO) {
// If the pull takes too long, it's also accounted as a non-empty pull to avoid pulling too often.
last_non_empty_pull = tick_start;
Expand All @@ -489,8 +493,11 @@ impl BatchGenerator {
);
}
}
last_call = current_call;

}),
Some(cmd) = cmd_rx.recv() => monitor!("batch_generator_handle_command", {
let start = Instant::now();
match cmd {
BatchGeneratorCommand::CommitNotification(block_timestamp, batches) => {
trace!(
Expand Down Expand Up @@ -528,6 +535,7 @@ impl BatchGenerator {
);
}
}
println!("handle_commit_notification take {} ms", start.elapsed().as_millis());
},
BatchGeneratorCommand::ProofExpiration(batch_ids) => {
for batch_id in batch_ids {
Expand Down
14 changes: 14 additions & 0 deletions aptos-core/consensus/src/quorum_store/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,3 +877,17 @@ pub static BATCH_RECEIVED_LATE_REPLIES_COUNT: Lazy<IntCounter> = Lazy::new(|| {
)
.unwrap()
});

pub static MAX_BATCH_COUNT: Lazy<Histogram> = Lazy::new(|| {
register_avg_counter(
"quorum_store_max_batch_count",
"Maximum number of batches in the quorum store."
)
});

pub static MAX_BATCH_BYTES: Lazy<Histogram> = Lazy::new(|| {
register_avg_counter(
"quorum_store_max_batch_bytes",
"Maximum number of bytes in the quorum store."
)
});
1 change: 0 additions & 1 deletion aptos-core/consensus/src/round_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ impl RoundManager {
new_round_event: NewRoundEvent,
) -> anyhow::Result<()> {
// TODO(gravity_jan): add sleep 1s for reth compatibility
tokio::time::sleep(Duration::from_millis(500)).await;
counters::CURRENT_ROUND.set(new_round_event.round as i64);
counters::ROUND_TIMEOUT_MS.set(new_round_event.timeout.as_millis() as i64);
match new_round_event.reason {
Expand Down
36 changes: 33 additions & 3 deletions aptos-core/mempool/src/core_mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,20 @@ use aptos_types::{
};
use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::{atomic::Ordering},
sync::{atomic::Ordering, Mutex, OnceLock},
time::{Duration, Instant, SystemTime},
};

use super::transaction::VerifiedTxn;

struct TimeMetric {
interval: u128,
last: Instant,
txn_count: u64,
call_count: u64,
}

static TIME_METRIC: OnceLock<Mutex<TimeMetric>> = OnceLock::new();

pub struct Mempool {
// Stores the metadata of all transactions in mempool (of all states).
Expand Down Expand Up @@ -273,7 +281,7 @@ impl Mempool {

/// Used to add a transaction to the Mempool.
/// Performs basic validation: checks account's sequence number.
pub(crate) fn send_user_txn(
pub(crate) fn add_txn(
&mut self,
txn: VerifiedTxn,
db_sequence_number: u64,
Expand Down Expand Up @@ -341,6 +349,7 @@ impl Mempool {
.as_str(),
ranking_score,
);
counters::CORE_MEMPOOL_ADD_TXNS.inc();
status
}

Expand Down Expand Up @@ -437,6 +446,7 @@ impl Mempool {
skipped.insert((txn.address, tx_seq));
}
}
info!("walked: {} skipped: {}, exclude: {}", txn_walked, skipped.len(), exclude_transactions.len());
let result_size = result.len();
let result_end_time = start_time.elapsed();
let result_time = result_end_time.saturating_sub(gas_end_time);
Expand Down Expand Up @@ -503,16 +513,36 @@ impl Mempool {
)
);
}
let tm = TIME_METRIC.get_or_init(|| Mutex::new(TimeMetric {
interval: 60_000,
last: Instant::now(),
call_count: 0,
txn_count: 0,
}));
let mut tm = tm.lock().unwrap();
tm.txn_count += result_size as u64;
tm.interval += tm.last.elapsed().as_millis();
tm.call_count += 1;
if tm.interval >= 1000 && tm.call_count > 0 && tm.txn_count > 0 {
println!("mempool get batch txn_count: {} in {} {}/s avg call take {:?}", tm.txn_count, tm.interval, (tm.txn_count * 1000) as f64 / tm.interval as f64, tm.interval as f64 / tm.call_count as f64);
tm.txn_count = 0;
tm.interval = 0;
tm.call_count = 0;
}
tm.last = Instant::now();


if !return_non_full && !full_bytes && (block.len() as u64) < max_txns {
block.clear();
}

counters::AVG_BATCH_SIZE.observe(block.len() as f64);
counters::GET_BATCH_SIZE.inc_by(block.len() as u64);
counters::mempool_service_transactions(counters::GET_BLOCK_LABEL, block.len());
counters::MEMPOOL_SERVICE_BYTES_GET_BLOCK.observe(total_bytes as f64);
for transaction in &block {
self.log_consensus_pulled_latency(transaction.sender(), transaction.sequence_number());
}
println!("max txns: {}, max bytes: {}, return_non_full: {}, block len: {:?}, exclude {:?}", max_txns, max_bytes, return_non_full, block.len(), exclude_size);
block
}

Expand Down
28 changes: 24 additions & 4 deletions aptos-core/mempool/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@

use aptos_config::network_id::{NetworkId, PeerNetworkId};
use aptos_metrics_core::{
exponential_buckets, histogram_opts, op_counters::DurationHistogram, register_histogram,
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge,
register_int_gauge_vec, Histogram, HistogramTimer, HistogramVec, IntCounter, IntCounterVec,
IntGauge, IntGaugeVec,
exponential_buckets, histogram_opts, op_counters::DurationHistogram, register_avg_counter, register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec, Histogram, HistogramTimer, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec
};
use aptos_short_hex_str::AsShortHexStr;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -669,6 +666,29 @@ pub static MAIN_LOOP: Lazy<DurationHistogram> = Lazy::new(|| {
)
});

pub static AVG_BATCH_SIZE: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"aptos_mempool_avg_batch_size",
"Average size of txns in a batch"
)
.unwrap()
});

pub static GET_BATCH_SIZE: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"aptos_mempool_get_batch_size",
"Number of txns in a batch"
)
.unwrap()
});

pub static CORE_MEMPOOL_ADD_TXNS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"aptos_core_mempool_added_txns_count",
"Number of txns removed from core mempool"
)
.unwrap()
});
#[cfg(test)]
mod test {
use crate::counters::RANKING_SCORE_BUCKETS;
Expand Down
5 changes: 3 additions & 2 deletions aptos-core/mempool/src/shared_mempool/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ async fn retrieve_from_execution_routine(
loop {
match execution_api.recv_pending_txns().await {
Ok(txns) => {
let mut mut_lock = mempool.lock();
info!("the recv_pending_txns size is {:?}", txns.len());
txns.into_iter().for_each(|txn_with_number| {
let r = mempool.lock().send_user_txn(
let r = mut_lock.add_txn(
txn_with_number.txn.into(),
txn_with_number.account_seq_num,
TimelineState::NotReady,
Expand All @@ -111,7 +112,7 @@ async fn retrieve_from_execution_routine(
continue;
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}

Expand Down
2 changes: 1 addition & 1 deletion aptos-core/mempool/src/shared_mempool/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ fn validate_and_add_transactions<NetworkClient>(
for (idx, (transaction, sequence_info, ready_time_at_sender, priority)) in
transactions.into_iter().enumerate()
{
let mempool_status = mempool.send_user_txn(
let mempool_status = mempool.add_txn(
transaction.into(),
sequence_info,
timeline_state,
Expand Down
4 changes: 2 additions & 2 deletions aptos-core/mempool/src/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub(crate) fn add_txns_to_mempool(
let mut transactions = vec![];
for transaction in txns {
let txn = transaction.make_signed_transaction();
pool.send_user_txn(
pool.add_txn(
(&txn).into(),
transaction.account_seqno,
TimelineState::NotReady,
Expand Down Expand Up @@ -146,7 +146,7 @@ pub(crate) fn send_user_txn(

pub(crate) fn add_signed_txn(pool: &mut CoreMempool, transaction: SignedTransaction) -> Result<()> {
match pool
.send_user_txn(
.add_txn(
(&transaction).into(),
0,
TimelineState::NotReady,
Expand Down
16 changes: 8 additions & 8 deletions aptos-core/mempool/src/tests/core_mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ fn test_transaction_metrics() {
let (mut mempool, _) = setup_mempool();

let txn = TestTransaction::new(0, 0, 1).make_signed_transaction();
mempool.send_user_txn(
mempool.add_txn(
(&txn).into(),
0,
TimelineState::NotReady,
Expand All @@ -79,7 +79,7 @@ fn test_transaction_metrics() {
Some(BroadcastPeerPriority::Primary),
);
let txn = TestTransaction::new(1, 0, 1).make_signed_transaction();
mempool.send_user_txn(
mempool.add_txn(
(&txn).into(),
0,
TimelineState::NonQualified,
Expand All @@ -88,7 +88,7 @@ fn test_transaction_metrics() {
Some(BroadcastPeerPriority::Primary),
);
let txn = TestTransaction::new(2, 0, 1).make_signed_transaction();
mempool.send_user_txn(
mempool.add_txn(
(&txn).into(),
0,
TimelineState::NotReady,
Expand Down Expand Up @@ -868,7 +868,7 @@ fn test_gc_ready_transaction() {
let txn = TestTransaction::new(1, 1, 1).make_signed_transaction_with_expiration_time(0);
let sender_bucket = sender_bucket(&txn.sender(), MempoolConfig::default().num_sender_buckets);

pool.send_user_txn(
pool.add_txn(
(&txn).into(),
0,
TimelineState::NotReady,
Expand Down Expand Up @@ -928,7 +928,7 @@ fn test_clean_stuck_transactions() {
}
let db_sequence_number = 10;
let txn = TestTransaction::new(0, db_sequence_number, 1).make_signed_transaction();
pool.send_user_txn(
pool.add_txn(
(&txn).into(),
db_sequence_number,
TimelineState::NotReady,
Expand All @@ -946,7 +946,7 @@ fn test_get_transaction_by_hash() {
let mut pool = setup_mempool().0;
let db_sequence_number = 10;
let txn = TestTransaction::new(0, db_sequence_number, 1).make_signed_transaction();
pool.send_user_txn(
pool.add_txn(
(&txn).into(),
db_sequence_number,
TimelineState::NotReady,
Expand All @@ -967,7 +967,7 @@ fn test_get_transaction_by_hash_after_the_txn_is_updated() {
let mut pool = setup_mempool().0;
let db_sequence_number = 10;
let txn = TestTransaction::new(0, db_sequence_number, 1).make_signed_transaction();
pool.send_user_txn(
pool.add_txn(
(&txn).into(),
db_sequence_number,
TimelineState::NotReady,
Expand All @@ -979,7 +979,7 @@ fn test_get_transaction_by_hash_after_the_txn_is_updated() {

// new txn with higher gas price
let new_txn = TestTransaction::new(0, db_sequence_number, 100).make_signed_transaction();
pool.send_user_txn(
pool.add_txn(
(&new_txn).into(),
db_sequence_number,
TimelineState::NotReady,
Expand Down
2 changes: 1 addition & 1 deletion aptos-core/mempool/src/tests/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl MockSharedMempool {
let mut pool = self.mempool.lock();
for txn in txns {
if pool
.send_user_txn(
.add_txn(
(&txn).into(),
0,
TimelineState::NotReady,
Expand Down
Loading