From 98499ab3f8066f097cadd9b26b1909a17c33aa3f Mon Sep 17 00:00:00 2001 From: kamiyaa Date: Fri, 24 Jan 2025 15:29:06 +0800 Subject: [PATCH] refactor get_matching_lines to take in a slice of Vec - gate debug messages behind "test-utils" feature - rename message_retrieval_invariants_met to relayer_restart_invariants_met - add comment on message_retry termination invariants --- rust/main/agents/relayer/src/lib.rs | 3 +- rust/main/agents/relayer/src/msg/mod.rs | 3 +- .../agents/relayer/src/msg/pending_message.rs | 7 +- .../src/invariants/termination_invariants.rs | 24 +++---- rust/main/utils/run-locally/src/main.rs | 69 +++++++++++-------- rust/main/utils/run-locally/src/utils.rs | 35 ++++------ 6 files changed, 75 insertions(+), 66 deletions(-) diff --git a/rust/main/agents/relayer/src/lib.rs b/rust/main/agents/relayer/src/lib.rs index 62b896d628..501b70dc76 100644 --- a/rust/main/agents/relayer/src/lib.rs +++ b/rust/main/agents/relayer/src/lib.rs @@ -1,5 +1,6 @@ +pub mod msg; + mod merkle_tree; -mod msg; mod processor; mod prover; mod relayer; diff --git a/rust/main/agents/relayer/src/msg/mod.rs b/rust/main/agents/relayer/src/msg/mod.rs index e47015709c..2f13832719 100644 --- a/rust/main/agents/relayer/src/msg/mod.rs +++ b/rust/main/agents/relayer/src/msg/mod.rs @@ -32,7 +32,8 @@ pub(crate) mod gas_payment; pub(crate) mod metadata; pub(crate) mod op_queue; pub(crate) mod op_submitter; -pub(crate) mod pending_message; pub(crate) mod processor; +pub mod pending_message; + pub use gas_payment::GAS_EXPENDITURE_LOG_MESSAGE; diff --git a/rust/main/agents/relayer/src/msg/pending_message.rs b/rust/main/agents/relayer/src/msg/pending_message.rs index eba4d05af0..27464d1e6e 100644 --- a/rust/main/agents/relayer/src/msg/pending_message.rs +++ b/rust/main/agents/relayer/src/msg/pending_message.rs @@ -36,6 +36,8 @@ pub const CONFIRM_DELAY: Duration = if cfg!(any(test, feature = "test-utils")) { Duration::from_secs(60 * 10) }; +pub const RETRIEVED_MESSAGE_LOG: &str = "Message status retrieved from db"; + /// The message context contains the links needed to submit a message. Each /// instance is for a unique origin -> destination pairing. pub struct MessageContext { @@ -513,14 +515,17 @@ impl PendingMessage { // Attempt to fetch status about message from database let message_status = match ctx.origin_db.retrieve_status_by_message_id(&message.id()) { Ok(Some(status)) => { + #[cfg(feature = "test-utils")] tracing::debug!( ?status, id = format!("{:x}", message.id()), - "Message status retrieved from db" + "{}", + RETRIEVED_MESSAGE_LOG, ); status } _ => { + #[cfg(feature = "test-utils")] tracing::debug!("Message status not found in db"); PendingOperationStatus::FirstPrepareAttempt } diff --git a/rust/main/utils/run-locally/src/invariants/termination_invariants.rs b/rust/main/utils/run-locally/src/invariants/termination_invariants.rs index 6e69fd5e63..675016c760 100644 --- a/rust/main/utils/run-locally/src/invariants/termination_invariants.rs +++ b/rust/main/utils/run-locally/src/invariants/termination_invariants.rs @@ -91,11 +91,11 @@ pub fn termination_invariants_met( let relayer_logfile = File::open(log_file_path)?; let invariant_logs = &[ - STORING_NEW_MESSAGE_LOG_MESSAGE, - LOOKING_FOR_EVENTS_LOG_MESSAGE, - GAS_EXPENDITURE_LOG_MESSAGE, - HYPER_INCOMING_BODY_LOG_MESSAGE, - TX_ID_INDEXING_LOG_MESSAGE, + vec![STORING_NEW_MESSAGE_LOG_MESSAGE.to_string()], + vec![LOOKING_FOR_EVENTS_LOG_MESSAGE.to_string()], + vec![GAS_EXPENDITURE_LOG_MESSAGE.to_string()], + vec![HYPER_INCOMING_BODY_LOG_MESSAGE.to_string()], + vec![TX_ID_INDEXING_LOG_MESSAGE.to_string()], ]; let log_counts = get_matching_lines(&relayer_logfile, invariant_logs); // Zero insertion messages don't reach `submit` stage where gas is spent, so we only expect these logs for the other messages. @@ -105,23 +105,23 @@ pub fn termination_invariants_met( // EDIT: Having had a quick look, it seems like there are some legitimate reverts happening in the confirm step // (`Transaction attempting to process message either reverted or was reorged`) // in which case more gas expenditure logs than messages are expected. - let gas_expenditure_log_count = log_counts.get(GAS_EXPENDITURE_LOG_MESSAGE).unwrap(); + let gas_expenditure_log_count = log_counts[2]; assert!( - gas_expenditure_log_count >= &total_messages_expected, + gas_expenditure_log_count >= total_messages_expected, "Didn't record gas payment for all delivered messages. Got {} gas payment logs, expected at least {}", gas_expenditure_log_count, total_messages_expected ); // These tests check that we fixed https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/3915, where some logs would not show up assert!( - log_counts.get(STORING_NEW_MESSAGE_LOG_MESSAGE).unwrap() > &0, + gas_expenditure_log_count > 0, "Didn't find any logs about storing messages in db" ); assert!( - log_counts.get(LOOKING_FOR_EVENTS_LOG_MESSAGE).unwrap() > &0, + log_counts[1] > 0, "Didn't find any logs about looking for events in index range" ); - let total_tx_id_log_count = log_counts.get(TX_ID_INDEXING_LOG_MESSAGE).unwrap(); + let total_tx_id_log_count = log_counts[4]; assert!( // there are 3 txid-indexed events: // - relayer: merkle insertion and gas payment @@ -129,13 +129,13 @@ pub fn termination_invariants_met( // some logs are emitted for multiple events, so requiring there to be at least // `config.kathy_messages` logs is a reasonable approximation, since all three of these events // are expected to be logged for each message. - *total_tx_id_log_count as u64 >= config.kathy_messages, + total_tx_id_log_count as u64 >= config.kathy_messages, "Didn't find as many tx id logs as expected. Found {} and expected {}", total_tx_id_log_count, config.kathy_messages ); assert!( - log_counts.get(HYPER_INCOMING_BODY_LOG_MESSAGE).is_none(), + log_counts[3] == 0, "Verbose logs not expected at the log level set in e2e" ); diff --git a/rust/main/utils/run-locally/src/main.rs b/rust/main/utils/run-locally/src/main.rs index eab7cb8081..357ebe06ec 100644 --- a/rust/main/utils/run-locally/src/main.rs +++ b/rust/main/utils/run-locally/src/main.rs @@ -16,7 +16,7 @@ //! - `SEALEVEL_ENABLED`: true/false, enables sealevel testing. Defaults to true. use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, fs::{self, File}, path::Path, process::{Child, ExitCode}, @@ -33,8 +33,9 @@ use logging::log; pub use metrics::fetch_metric; use once_cell::sync::Lazy; use program::Program; +use relayer::msg::pending_message::RETRIEVED_MESSAGE_LOG; use tempfile::{tempdir, TempDir}; -use utils::get_matched_lines; +use utils::get_matching_lines; use crate::{ config::Config, @@ -435,7 +436,7 @@ fn main() -> ExitCode { let mut failure_occurred = false; let starting_relayer_balance: f64 = agent_balance_sum(9092).unwrap(); - while !SHUTDOWN.load(Ordering::Relaxed) { + 'main_loop: while !SHUTDOWN.load(Ordering::Relaxed) { if config.ci_mode { // for CI we have to look for the end condition. if termination_invariants_met( @@ -469,13 +470,15 @@ fn main() -> ExitCode { status.code().unwrap() ); failure_occurred = true; - SHUTDOWN.store(true, Ordering::Relaxed); - break; + break 'main_loop; } } } sleep(Duration::from_secs(5)); } + + // Here we want to restart the relayer and validate + // its restart behaviour. restart_relayer(&config, &mut state, &rocks_db_dir); // give relayer a chance to fully restart. @@ -486,12 +489,12 @@ fn main() -> ExitCode { SHUTDOWN.store(false, Ordering::Relaxed); while !SHUTDOWN.load(Ordering::Relaxed) { if config.ci_mode { - if message_retrieval_invariants_met().unwrap_or(false) { + if relayer_restart_invariants_met().unwrap_or(false) { // end condition reached successfully break; } else if (Instant::now() - loop_start).as_secs() > config.ci_mode_timeout { // we ran out of time - log!("CI timeout reached before messages where retrieved from db"); + log!("CI timeout reached before relayer restart invariants were met"); failure_occurred = true; break; } @@ -576,45 +579,51 @@ fn create_relayer(config: &Config, rocks_db_dir: &TempDir) -> Program { } } +/// Kills relayer in State and respawns the relayer again fn restart_relayer(config: &Config, state: &mut State, rocks_db_dir: &TempDir) { + log!("Stopping relayer..."); let (child, _) = state.agents.get_mut("RLY").expect("No relayer agent found"); - - child.kill().expect("Failed to kill relayer"); - - let relayer_env = create_relayer(config, rocks_db_dir); + child.kill().expect("Failed to stop relayer"); log!("Restarting relayer..."); + let relayer_env = create_relayer(config, rocks_db_dir); state.push_agent(relayer_env.spawn("RLY", Some(&AGENT_LOGGING_DIR))); + log!("Restarted relayer..."); } -fn message_retrieval_invariants_met() -> eyre::Result { +/// Check relayer restart behaviour is correct. +/// So far, we only check if undelivered messages' statuses +/// are correctly retrieved from the database +fn relayer_restart_invariants_met() -> eyre::Result { let log_file_path = AGENT_LOGGING_DIR.join("RLY-output.log"); let relayer_logfile = File::open(log_file_path).unwrap(); log!("Checking message statuses were retrieved from logs..."); + let matched_logs = get_matching_lines( + &relayer_logfile, + &vec![vec![ + RETRIEVED_MESSAGE_LOG.to_string(), + "CouldNotFetchMetadata".to_string(), + ]], + ); - const RETRIEVED_MESSGE_LOG: &str = "Message status retrieved from db"; - let matched_logs = get_matched_lines(&relayer_logfile, RETRIEVED_MESSGE_LOG); - - let no_metadata_message_ids: HashSet<_> = matched_logs - .iter() - .filter(|s| s.contains("CouldNotFetchMetadata")) - .filter_map(|s| { - // string parse for id and remove duplicates by collecting - // into a hashset - s.rsplit_once("id\u{1b}[0m\u{1b}[2m=\u{1b}[0m\"") - .and_then(|(_, s2)| s2.split_once("\"")) - .map(|(id, _)| id) - }) - .collect(); - - let no_metadata_message_count = no_metadata_message_ids.len(); - if no_metadata_message_count < ZERO_MERKLE_INSERTION_KATHY_MESSAGES as usize { + let no_metadata_message_count = matched_logs[0]; + // These messages are never inserted into the merkle tree. + // So these messages will never be deliverable and will always + // be in a CouldNotFetchMetadata state. + // When the relayer restarts, these messages' statuses should be + // retrieved from the database with CouldNotFetchMetadata status. + if no_metadata_message_count < ZERO_MERKLE_INSERTION_KATHY_MESSAGES { + log!( + "No metadata message count is {}, expected {}", + no_metadata_message_count, + ZERO_MERKLE_INSERTION_KATHY_MESSAGES + ); return Ok(false); } assert_eq!( no_metadata_message_count, - ZERO_MERKLE_INSERTION_KATHY_MESSAGES as usize + ZERO_MERKLE_INSERTION_KATHY_MESSAGES ); Ok(true) } diff --git a/rust/main/utils/run-locally/src/utils.rs b/rust/main/utils/run-locally/src/utils.rs index 8888cf35c0..c1d2503c7a 100644 --- a/rust/main/utils/run-locally/src/utils.rs +++ b/rust/main/utils/run-locally/src/utils.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::fs::File; use std::io::{self, BufRead}; use std::path::{Path, PathBuf}; @@ -119,30 +118,24 @@ pub fn stop_child(child: &mut Child) { }; } -pub fn get_matching_lines(file: &File, search_strings: &[&str]) -> HashMap { +pub fn get_matching_lines(file: &File, search_strings: &[Vec]) -> Vec { let reader = io::BufReader::new(file); - let mut matches = HashMap::new(); + let mut matches = vec![0; search_strings.len()]; let mut lines = reader.lines(); while let Some(Ok(line)) = lines.next() { - search_strings.iter().for_each(|search_string| { - if line.contains(search_string) { - let count = matches.entry(search_string.to_string()).or_insert(0); - *count += 1; - } - }); + search_strings + .iter() + .enumerate() + .for_each(|(i, search_string_vec)| { + if search_string_vec + .iter() + .map(|search_string| line.contains(search_string)) + .fold(true, |acc, x| acc && x) + { + matches[i] += 1; + } + }); } matches } - -pub fn get_matched_lines(file: &File, search_string: &str) -> Vec { - let reader = io::BufReader::new(file); - let mut lines = reader.lines(); - let mut matched_lines = Vec::new(); - while let Some(Ok(line)) = lines.next() { - if line.contains(search_string) { - matched_lines.push(line); - } - } - matched_lines -}