Skip to content

Commit

Permalink
refactor get_matching_lines to take in a slice of Vec<String>
Browse files Browse the repository at this point in the history
 - gate debug messages behind "test-utils" feature
 - rename message_retrieval_invariants_met to relayer_restart_invariants_met
 - add comment on message_retry termination invariants
  • Loading branch information
kamiyaa committed Jan 24, 2025
1 parent 62d60c6 commit 98499ab
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 66 deletions.
3 changes: 2 additions & 1 deletion rust/main/agents/relayer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod msg;

mod merkle_tree;
mod msg;
mod processor;
mod prover;
mod relayer;
Expand Down
3 changes: 2 additions & 1 deletion rust/main/agents/relayer/src/msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ pub(crate) mod gas_payment;
pub(crate) mod metadata;
pub(crate) mod op_queue;
pub(crate) mod op_submitter;
pub(crate) mod pending_message;
pub(crate) mod processor;

pub mod pending_message;

pub use gas_payment::GAS_EXPENDITURE_LOG_MESSAGE;
7 changes: 6 additions & 1 deletion rust/main/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub const CONFIRM_DELAY: Duration = if cfg!(any(test, feature = "test-utils")) {
Duration::from_secs(60 * 10)
};

pub const RETRIEVED_MESSAGE_LOG: &str = "Message status retrieved from db";

/// The message context contains the links needed to submit a message. Each
/// instance is for a unique origin -> destination pairing.
pub struct MessageContext {
Expand Down Expand Up @@ -513,14 +515,17 @@ impl PendingMessage {
// Attempt to fetch status about message from database
let message_status = match ctx.origin_db.retrieve_status_by_message_id(&message.id()) {
Ok(Some(status)) => {
#[cfg(feature = "test-utils")]
tracing::debug!(
?status,
id = format!("{:x}", message.id()),
"Message status retrieved from db"
"{}",
RETRIEVED_MESSAGE_LOG,
);
status
}
_ => {
#[cfg(feature = "test-utils")]
tracing::debug!("Message status not found in db");
PendingOperationStatus::FirstPrepareAttempt
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ pub fn termination_invariants_met(

let relayer_logfile = File::open(log_file_path)?;
let invariant_logs = &[
STORING_NEW_MESSAGE_LOG_MESSAGE,
LOOKING_FOR_EVENTS_LOG_MESSAGE,
GAS_EXPENDITURE_LOG_MESSAGE,
HYPER_INCOMING_BODY_LOG_MESSAGE,
TX_ID_INDEXING_LOG_MESSAGE,
vec![STORING_NEW_MESSAGE_LOG_MESSAGE.to_string()],
vec![LOOKING_FOR_EVENTS_LOG_MESSAGE.to_string()],
vec![GAS_EXPENDITURE_LOG_MESSAGE.to_string()],
vec![HYPER_INCOMING_BODY_LOG_MESSAGE.to_string()],
vec![TX_ID_INDEXING_LOG_MESSAGE.to_string()],
];
let log_counts = get_matching_lines(&relayer_logfile, invariant_logs);
// Zero insertion messages don't reach `submit` stage where gas is spent, so we only expect these logs for the other messages.
Expand All @@ -105,37 +105,37 @@ pub fn termination_invariants_met(
// EDIT: Having had a quick look, it seems like there are some legitimate reverts happening in the confirm step
// (`Transaction attempting to process message either reverted or was reorged`)
// in which case more gas expenditure logs than messages are expected.
let gas_expenditure_log_count = log_counts.get(GAS_EXPENDITURE_LOG_MESSAGE).unwrap();
let gas_expenditure_log_count = log_counts[2];
assert!(
gas_expenditure_log_count >= &total_messages_expected,
gas_expenditure_log_count >= total_messages_expected,
"Didn't record gas payment for all delivered messages. Got {} gas payment logs, expected at least {}",
gas_expenditure_log_count,
total_messages_expected
);
// These tests check that we fixed https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/3915, where some logs would not show up
assert!(
log_counts.get(STORING_NEW_MESSAGE_LOG_MESSAGE).unwrap() > &0,
gas_expenditure_log_count > 0,
"Didn't find any logs about storing messages in db"
);
assert!(
log_counts.get(LOOKING_FOR_EVENTS_LOG_MESSAGE).unwrap() > &0,
log_counts[1] > 0,
"Didn't find any logs about looking for events in index range"
);
let total_tx_id_log_count = log_counts.get(TX_ID_INDEXING_LOG_MESSAGE).unwrap();
let total_tx_id_log_count = log_counts[4];
assert!(
// there are 3 txid-indexed events:
// - relayer: merkle insertion and gas payment
// - scraper: gas payment
// some logs are emitted for multiple events, so requiring there to be at least
// `config.kathy_messages` logs is a reasonable approximation, since all three of these events
// are expected to be logged for each message.
*total_tx_id_log_count as u64 >= config.kathy_messages,
total_tx_id_log_count as u64 >= config.kathy_messages,
"Didn't find as many tx id logs as expected. Found {} and expected {}",
total_tx_id_log_count,
config.kathy_messages
);
assert!(
log_counts.get(HYPER_INCOMING_BODY_LOG_MESSAGE).is_none(),
log_counts[3] == 0,
"Verbose logs not expected at the log level set in e2e"
);

Expand Down
69 changes: 39 additions & 30 deletions rust/main/utils/run-locally/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! - `SEALEVEL_ENABLED`: true/false, enables sealevel testing. Defaults to true.
use std::{
collections::{HashMap, HashSet},
collections::HashMap,
fs::{self, File},
path::Path,
process::{Child, ExitCode},
Expand All @@ -33,8 +33,9 @@ use logging::log;
pub use metrics::fetch_metric;
use once_cell::sync::Lazy;
use program::Program;
use relayer::msg::pending_message::RETRIEVED_MESSAGE_LOG;
use tempfile::{tempdir, TempDir};
use utils::get_matched_lines;
use utils::get_matching_lines;

use crate::{
config::Config,
Expand Down Expand Up @@ -435,7 +436,7 @@ fn main() -> ExitCode {
let mut failure_occurred = false;
let starting_relayer_balance: f64 = agent_balance_sum(9092).unwrap();

while !SHUTDOWN.load(Ordering::Relaxed) {
'main_loop: while !SHUTDOWN.load(Ordering::Relaxed) {
if config.ci_mode {
// for CI we have to look for the end condition.
if termination_invariants_met(
Expand Down Expand Up @@ -469,13 +470,15 @@ fn main() -> ExitCode {
status.code().unwrap()
);
failure_occurred = true;
SHUTDOWN.store(true, Ordering::Relaxed);
break;
break 'main_loop;
}
}
}
sleep(Duration::from_secs(5));
}

// Here we want to restart the relayer and validate
// its restart behaviour.
restart_relayer(&config, &mut state, &rocks_db_dir);

// give relayer a chance to fully restart.
Expand All @@ -486,12 +489,12 @@ fn main() -> ExitCode {
SHUTDOWN.store(false, Ordering::Relaxed);
while !SHUTDOWN.load(Ordering::Relaxed) {
if config.ci_mode {
if message_retrieval_invariants_met().unwrap_or(false) {
if relayer_restart_invariants_met().unwrap_or(false) {
// end condition reached successfully
break;
} else if (Instant::now() - loop_start).as_secs() > config.ci_mode_timeout {
// we ran out of time
log!("CI timeout reached before messages where retrieved from db");
log!("CI timeout reached before relayer restart invariants were met");
failure_occurred = true;
break;
}
Expand Down Expand Up @@ -576,45 +579,51 @@ fn create_relayer(config: &Config, rocks_db_dir: &TempDir) -> Program {
}
}

/// Kills relayer in State and respawns the relayer again
fn restart_relayer(config: &Config, state: &mut State, rocks_db_dir: &TempDir) {
log!("Stopping relayer...");
let (child, _) = state.agents.get_mut("RLY").expect("No relayer agent found");

child.kill().expect("Failed to kill relayer");

let relayer_env = create_relayer(config, rocks_db_dir);
child.kill().expect("Failed to stop relayer");

log!("Restarting relayer...");
let relayer_env = create_relayer(config, rocks_db_dir);
state.push_agent(relayer_env.spawn("RLY", Some(&AGENT_LOGGING_DIR)));
log!("Restarted relayer...");
}

fn message_retrieval_invariants_met() -> eyre::Result<bool> {
/// Check relayer restart behaviour is correct.
/// So far, we only check if undelivered messages' statuses
/// are correctly retrieved from the database
fn relayer_restart_invariants_met() -> eyre::Result<bool> {
let log_file_path = AGENT_LOGGING_DIR.join("RLY-output.log");
let relayer_logfile = File::open(log_file_path).unwrap();

log!("Checking message statuses were retrieved from logs...");
let matched_logs = get_matching_lines(
&relayer_logfile,
&vec![vec![
RETRIEVED_MESSAGE_LOG.to_string(),
"CouldNotFetchMetadata".to_string(),
]],
);

const RETRIEVED_MESSGE_LOG: &str = "Message status retrieved from db";
let matched_logs = get_matched_lines(&relayer_logfile, RETRIEVED_MESSGE_LOG);

let no_metadata_message_ids: HashSet<_> = matched_logs
.iter()
.filter(|s| s.contains("CouldNotFetchMetadata"))
.filter_map(|s| {
// string parse for id and remove duplicates by collecting
// into a hashset
s.rsplit_once("id\u{1b}[0m\u{1b}[2m=\u{1b}[0m\"")
.and_then(|(_, s2)| s2.split_once("\""))
.map(|(id, _)| id)
})
.collect();

let no_metadata_message_count = no_metadata_message_ids.len();
if no_metadata_message_count < ZERO_MERKLE_INSERTION_KATHY_MESSAGES as usize {
let no_metadata_message_count = matched_logs[0];
// These messages are never inserted into the merkle tree.
// So these messages will never be deliverable and will always
// be in a CouldNotFetchMetadata state.
// When the relayer restarts, these messages' statuses should be
// retrieved from the database with CouldNotFetchMetadata status.
if no_metadata_message_count < ZERO_MERKLE_INSERTION_KATHY_MESSAGES {
log!(
"No metadata message count is {}, expected {}",
no_metadata_message_count,
ZERO_MERKLE_INSERTION_KATHY_MESSAGES
);
return Ok(false);
}
assert_eq!(
no_metadata_message_count,
ZERO_MERKLE_INSERTION_KATHY_MESSAGES as usize
ZERO_MERKLE_INSERTION_KATHY_MESSAGES
);
Ok(true)
}
Expand Down
35 changes: 14 additions & 21 deletions rust/main/utils/run-locally/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::fs::File;
use std::io::{self, BufRead};
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -119,30 +118,24 @@ pub fn stop_child(child: &mut Child) {
};
}

pub fn get_matching_lines(file: &File, search_strings: &[&str]) -> HashMap<String, u32> {
pub fn get_matching_lines(file: &File, search_strings: &[Vec<String>]) -> Vec<u32> {
let reader = io::BufReader::new(file);
let mut matches = HashMap::new();
let mut matches = vec![0; search_strings.len()];

let mut lines = reader.lines();
while let Some(Ok(line)) = lines.next() {
search_strings.iter().for_each(|search_string| {
if line.contains(search_string) {
let count = matches.entry(search_string.to_string()).or_insert(0);
*count += 1;
}
});
search_strings
.iter()
.enumerate()
.for_each(|(i, search_string_vec)| {
if search_string_vec
.iter()
.map(|search_string| line.contains(search_string))
.fold(true, |acc, x| acc && x)
{
matches[i] += 1;
}
});
}
matches
}

pub fn get_matched_lines(file: &File, search_string: &str) -> Vec<String> {
let reader = io::BufReader::new(file);
let mut lines = reader.lines();
let mut matched_lines = Vec::new();
while let Some(Ok(line)) = lines.next() {
if line.contains(search_string) {
matched_lines.push(line);
}
}
matched_lines
}

0 comments on commit 98499ab

Please sign in to comment.