Skip to content

Commit

Permalink
Merge branch 'VER-2324-stream-uvm-logs-in-test-driver' into 'master'
Browse files Browse the repository at this point in the history
Stream uvm logs in the test-driver

Test-driver can now [stream logs of all deployed uvms](https://dfinity.atlassian.net/browse/VER-2324).
This aims at improving debugging experience. However, it comes with a cost of the increased logs size.

[Here](https://dash.idx.dfinity.network/invocation/78fd727c-6e53-41c3-9223-16ccc965c077?target=%2F%2Frs%2Ftests%2Fnetworking%3Acanister_http_test) is an example of the `canister_http_test` log:
```
some logs ...
...
// test-runner discovers a uvm in the env and starts streaming Journald entries
2023-06-21 09:33:04.920 INFO[uvms_logs_stream:rs/tests/src/driver/group.rs:563:0] Streaming Journald for newly discovered uvm=httpbin with ipv6=2a0b:21c0:4003:2:50dc:b2ff:fe21:a0ae
...
// entries in Journald haven't appeared yet ...
2023-06-21 09:33:24.202 WARN[uvms_logs_stream:rs/tests/src/driver/group.rs:999:0] All entries of Journald are read to completion. Streaming Journald will start again in 5 sec ...
// now entries in Journald appeared and are being streamed 
2023-06-21 09:33:29.369 INFO[uvms_logs_stream:StdOut] [uvm=httpbin] JournalRecord {message: "kvm-clock: cpu 1, msr 57f3bb041, secondary cpu clock"}
2023-06-21 09:33:29.395 INFO[uvms_logs_stream:StdOut] [uvm=httpbin] JournalRecord {message: "pcieport 0000:00:01.5: PME: Signaling with IRQ 29"}
...
```
Around 1800 Journald entries of the `[uvm=httpbin]` are present in the log.

Another important application of the feature is for running `colocated` tests.
The test-runner deploys one `[uvm=test-driver]` and will stream all of its logs. `[uvm=test-driver]` in turn streams the logs of all the uvms that are deployed within the test, e.g. `[uvm=httpbin]`. Thus, test-runner will ultimately display all the logs. Here is an example for [canister_http_test_colocate](https://dash.idx.dfinity.network/invocation/b19ad835-92a0-42df-a737-d886d9f3c6b5?target=%2F%2Frs%2Ftests%2Fnetworking%3Acanister_http_test_colocate):

```
some logs ...
...
// test-runner discovers the uvm in the env and starts streaming Journald entries
2023-06-21 08:58:21.476 INFO[uvms_logs_steam:rs/tests/src/driver/group.rs:563:0] Streaming Journald for newly discovered uvm=test-driver with ipv6=2600:c00:2:100:50af:81ff:fe16:a072
...
// example: log of the [uvm=test-driver]
2023-06-21 08:58:39.537 INFO[uvms_logs_steam:StdOut] [uvm=test-driver] JournalRecord {message: "Linux version 5.15.79 (nixbld@localhost) (gcc (GCC) 11.3.0, GNU ld (GNU Binutils) 2.39) #1-NixOS SMP Wed Nov 16 08:58:31 UTC 2022"}
..
// example: normal log of the test, thus `TEST_LOG` tag.
2023-06-21 09:01:09.412 INFO[uvms_logs_steam:StdOut] [uvm=test-driver] TEST_LOG: 2023-06-21 09:01:09.025 INFO[canister_http::http_basic::test:rs/tests/src/canister_http/http_basic.rs:84:0] Update call succeeded! Ok(RemoteHttpResponse { status: 200, headers: [], body: "<!DOCTYPE html>\n<html lang=\"en\">\n<head>\n  <title>httpbin</title>\n</head>\n<body>\n  <h1>httpbin</h1>\n</body>\n</html>There is context to be appended in body" })
...
// example: log of the [uvm=httpbin], which is first streamed to [uvm=test-driver] and then to the test-runner, thus nested:
2023-06-21 08:59:54.600 INFO[uvms_logs_steam:StdOut] [uvm=test-driver] TEST_LOG: 2023-06-21 08:59:53.925 INFO[uvms_logs_steam:StdOut] [uvm=httpbin] JournalRecord {message: "BIOS-e820: [mem 0x0000000000000000-0x000000000009ffff] usable"}
```

Implementation remarks:
- A separate background `uvms_logs_stream_task` is created, which is the parent of `keepalive_task`
- uvms discovering is retried and performed in all env folder. Thus, one can deploy uvms in setup, test, or other tasks.
- uvms logs streaming is retried in case of errors or if the logs are read to completion

TODOs:
- Limit the max number of Journald entries to avoid overflow 

See merge request dfinity-lab/public/ic!12591
  • Loading branch information
nikolay-komarevskiy committed Jul 2, 2023
2 parents 12ef5e1 + ef1bd7e commit 7742d96
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 113 deletions.
258 changes: 241 additions & 17 deletions rs/tests/src/driver/group.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#![allow(dead_code)]
#[rustfmt::skip]

use std::path::PathBuf;

use walkdir::WalkDir;
use crate::driver::{
farm::{Farm, HostFeature},
resource::AllocatedVm,
task_scheduler::TaskScheduler,
test_env_api::{FarmBaseUrl, HasGroupSetup},
{
Expand All @@ -25,33 +24,46 @@ use crate::driver::{
test_setup::GroupSetup,
};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;

use anyhow::{bail, Result};
use anyhow::{bail, Context, Result};
use clap::Parser;
use tokio::runtime::{Builder, Handle, Runtime};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::TcpSocket,
runtime::{Builder, Handle, Runtime},
};

use crate::driver::{
constants::{kibana_link, GROUP_TTL, KEEPALIVE_INTERVAL},
report::{SystemGroupSummary, SystemTestGroupError},
subprocess_task::SubprocessTask,
task::{SkipTestTask, Task},
timeout::TimeoutTask,
};
use std::{collections::BTreeMap, iter::once, time::Duration};

use slog::{debug, info, trace, Logger};
use std::{
collections::{BTreeMap, HashMap},
iter::once,
net::Ipv6Addr,
time::Duration,
};

use super::report::{SystemGroupSummary, SystemTestGroupError};
use slog::{debug, error, info, trace, warn, Logger};

const DEFAULT_TIMEOUT_PER_TEST: Duration = Duration::from_secs(60 * 10); // 10 minutes
const DEFAULT_OVERALL_TIMEOUT: Duration = Duration::from_secs(60 * 10); // 10 minutes
pub const MAX_RUNTIME_THREADS: usize = 16;
pub const MAX_RUNTIME_BLOCKING_THREADS: usize = 16;
const RETRY_DELAY_JOURNALD_STREAM: Duration = Duration::from_secs(5);
const RETRY_DELAY_DISCOVER_UVMS: Duration = Duration::from_secs(5);

const DEBUG_KEEPALIVE_TASK_NAME: &str = "debug_keepalive";
const REPORT_TASK_NAME: &str = "report";
const KEEPALIVE_TASK_NAME: &str = "keepalive";
const UVMS_LOGS_STREAM_TASK_NAME: &str = "uvms_logs_stream";
const SETUP_TASK_NAME: &str = "setup";
const LIFETIME_GUARD_TASK_PREFIX: &str = "lifetime_guard_";
pub const COLOCATE_CONTAINER_NAME: &str = "system_test";

#[derive(Parser, Debug)]
pub struct CliArgs {
Expand Down Expand Up @@ -168,7 +180,7 @@ impl TestEnvAttribute for SetupResult {
}

pub fn is_task_visible_to_user(task_id: &TaskId) -> bool {
matches!(task_id, TaskId::Test(task_name) if task_name.ne(REPORT_TASK_NAME) && task_name.ne(KEEPALIVE_TASK_NAME) && !task_name.starts_with(LIFETIME_GUARD_TASK_PREFIX) && !task_name.starts_with("dummy("))
matches!(task_id, TaskId::Test(task_name) if task_name.ne(REPORT_TASK_NAME) && task_name.ne(KEEPALIVE_TASK_NAME) && task_name.ne(UVMS_LOGS_STREAM_TASK_NAME) && !task_name.starts_with(LIFETIME_GUARD_TASK_PREFIX) && !task_name.starts_with("dummy("))
}

pub struct ComposeContext<'a> {
Expand Down Expand Up @@ -524,6 +536,66 @@ impl SystemTestGroup {
timeout_per_test: self.effective_timeout_per_test(),
};

let uvms_logs_stream_task_id = TaskId::Test(String::from(UVMS_LOGS_STREAM_TASK_NAME));
let uvms_logs_stream_task = {
Box::from(subproc(
uvms_logs_stream_task_id,
{
let logger = group_ctx.logger().clone();
let group_ctx = group_ctx.clone();
move || {
let rt: Runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.max_blocking_threads(1)
.enable_all()
.build()
.unwrap_or_else(|err| {
panic!("Could not create tokio runtime: {}", err)
});
let root_search_dir = {
let root_env = group_ctx
.clone()
.get_root_env()
.expect("root_env should already exist");
let base_path = root_env.base_path();
base_path
.parent()
.expect("root_env dir should have a parent dir")
.to_path_buf()
};
let mut streamed_uvms: HashMap<String, Ipv6Addr> = HashMap::new();
debug!(logger, ">>> {UVMS_LOGS_STREAM_TASK_NAME}");
loop {
match discover_uvms(root_search_dir.clone()) {
Ok(discovered_uvms) => {
for (key, value) in discovered_uvms {
streamed_uvms.entry(key.clone()).or_insert_with(|| {
let logger = logger.clone();
info!(
logger,
"Streaming Journald for newly discovered [uvm={key}] with ipv6={value}"
);
// The task starts, but the handle is never joined.
let _ = rt.spawn(stream_journald_with_retries(logger, key.clone(), value));
value
});
}
}
Err(err) => {
warn!(
logger,
"Discovering deployed uvms failed with err:{err}"
);
}
}
std::thread::sleep(RETRY_DELAY_DISCOVER_UVMS);
}
}
},
&mut compose_ctx,
)) as Box<dyn Task>
};

// The ID of the root task is needed outside this function for awaiting when the plan execution finishes.
let keepalive_task_id = TaskId::Test(String::from(KEEPALIVE_TASK_NAME));
let keepalive_task = if self.with_farm && !group_ctx.no_farm_keepalive {
Expand Down Expand Up @@ -602,7 +674,7 @@ impl SystemTestGroup {

// normal case: no debugkeepalive, overall timeout is active
if !group_ctx.debug_keepalive {
let plan = compose(
let keepalive_plan = compose(
Some(keepalive_task),
EvalOrder::Sequential,
vec![compose(
Expand All @@ -620,24 +692,31 @@ impl SystemTestGroup {
&mut compose_ctx,
);

let plan = Ok(compose(
let uvms_stream_plan = compose(
Some(uvms_logs_stream_task),
EvalOrder::Sequential,
vec![keepalive_plan],
&mut compose_ctx,
);

let report_plan = Ok(compose(
Some(Box::new(EmptyTask::new(TaskId::Test(
REPORT_TASK_NAME.to_string(),
)))),
EvalOrder::Sequential,
vec![if let Some(overall_timeout) = self.overall_timeout {
timed(
plan,
uvms_stream_plan,
overall_timeout,
Some(String::from("::group")),
&mut compose_ctx,
)
} else {
plan
uvms_stream_plan
}],
&mut compose_ctx,
));
return plan;
return report_plan;
}

// otherwise: keepalive needs to be above report task. no overall timeout.
Expand All @@ -649,7 +728,14 @@ impl SystemTestGroup {
)),
};

let plan = compose(
let uvms_stream_plan = compose(
Some(uvms_logs_stream_task),
EvalOrder::Sequential,
vec![keepalive_plan],
&mut compose_ctx,
);

let report_plan = compose(
Some(Box::new(EmptyTask::new(TaskId::Test(
REPORT_TASK_NAME.to_string(),
)))),
Expand All @@ -672,7 +758,7 @@ impl SystemTestGroup {
Ok(compose(
Some(keepalive_task),
EvalOrder::Parallel,
vec![plan, keepalive_plan],
vec![report_plan, uvms_stream_plan],
&mut compose_ctx,
))
}
Expand Down Expand Up @@ -861,3 +947,141 @@ fn print_report(ctx: &GroupContext, report: &SystemGroupSummary) {
);
};
}

#[derive(Debug, Deserialize)]
struct JournalRecord {
#[serde(rename = "__CURSOR")]
cursor: String,
#[serde(rename = "MESSAGE")]
message: String,
#[serde(rename = "_SYSTEMD_UNIT")]
system_unit: Option<String>,
#[serde(rename = "CONTAINER_NAME")]
container_name: Option<String>,
#[serde(rename = "_COMM")]
comm: Option<String>,
}

impl std::fmt::Display for JournalRecord {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
if let Some(ref container) = self.container_name {
if container == COLOCATE_CONTAINER_NAME {
return write!(f, "TEST_LOG: {}", self.message);
}
}
let mut display = format!("message: \"{}\"", self.message);
if let Some(x) = &self.system_unit {
display += format!(", system_unit: \"{}\"", x).as_str()
}
if let Some(x) = &self.container_name {
display += format!(", container_name: \"{}\"", x).as_str()
}
if let Some(x) = &self.comm {
display += format!(", comm: \"{}\"", x).as_str()
}
write!(f, "JournalRecord {{{display}}}")
}
}

fn discover_uvms(root_path: PathBuf) -> Result<HashMap<String, Ipv6Addr>> {
let mut uvms: HashMap<String, Ipv6Addr> = HashMap::new();
for entry in WalkDir::new(root_path)
.into_iter()
.filter_map(Result::ok)
.filter(|e| {
let file_name = String::from(e.file_name().to_string_lossy());
e.file_type().is_file() && file_name == "vm.json"
})
.map(|e| e.path().to_owned())
{
let file =
std::fs::File::open(&entry).with_context(|| format!("Could not open: {:?}", &entry))?;
let vm: AllocatedVm = serde_json::from_reader(file)
.with_context(|| format!("{:?}: Could not read json.", &entry))?;
uvms.insert(vm.name.to_string(), vm.ipv6);
}
Ok(uvms)
}

async fn stream_journald_with_retries(logger: slog::Logger, uvm_name: String, ipv6: Ipv6Addr) {
// Start streaming Journald from the very beginning, which corresponds to the cursor="".
let mut cursor = Cursor::Start;
loop {
// In normal scenarios, i.e. without errors/interrupts, the function below should never return.
// In case it returns unexpectedly, we restart reading logs from the checkpoint cursor.
let (cursor_next, result) =
stream_journald_from_cursor(uvm_name.clone(), ipv6, cursor).await;
cursor = cursor_next;
if let Err(err) = result {
error!(
logger,
"Streaming Journald for uvm={uvm_name} with ipv6={ipv6} failed with: {err}"
);
}
// Should we stop reading Journald here?
warn!(
logger,
"All entries of Journald are read to completion. Streaming Journald will start again in {} sec ...",
RETRY_DELAY_JOURNALD_STREAM.as_secs()
);
tokio::time::sleep(RETRY_DELAY_JOURNALD_STREAM).await;
}
}

enum Cursor {
Start,
Position(String),
}

impl std::fmt::Display for Cursor {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Cursor::Start => write!(f, ""),
Cursor::Position(x) => write!(f, "{}", x),
}
}
}

macro_rules! unwrap_or_return {
( $val1:expr, $val2:expr ) => {
match $val2 {
Ok(x) => x,
Err(x) => return ($val1, Err(x.into())),
}
};
}

async fn stream_journald_from_cursor(
uvm_name: String,
ipv6: Ipv6Addr,
mut cursor: Cursor,
) -> (Cursor, anyhow::Result<()>) {
let socket_addr = std::net::SocketAddr::new(ipv6.into(), 19531);
let socket = unwrap_or_return!(cursor, TcpSocket::new_v6());
let mut stream = unwrap_or_return!(cursor, socket.connect(socket_addr).await);
unwrap_or_return!(
cursor,
stream.write_all(b"GET /entries?follow HTTP/1.1\n").await
);
unwrap_or_return!(
cursor,
stream.write_all(b"Accept: application/json\n").await
);
unwrap_or_return!(
cursor,
stream
.write_all(format!("Range: entries={cursor}:0:\n\r\n\r").as_bytes())
.await
);
let buf_reader = BufReader::new(stream);
let mut lines = buf_reader.lines();
while let Some(line) = unwrap_or_return!(cursor, lines.next_line().await) {
let record_result: Result<JournalRecord, serde_json::Error> = serde_json::from_str(&line);
if let Ok(record) = record_result {
println!("[uvm={uvm_name}] {record}");
// We update the cursor value, so that in case function errors, journald entries can be streamed from this checkpoint.
cursor = Cursor::Position(record.cursor);
}
}
(cursor, Ok(()))
}
3 changes: 2 additions & 1 deletion rs/tests/src/driver/universal_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub enum UniversalVmConfig {
Img(PathBuf),
}

const UNIVERSAL_VMS_DIR: &str = "universal_vms";
pub const UNIVERSAL_VMS_DIR: &str = "universal_vms";
const CONF_IMG_FNAME: &str = "config_disk.img.zst";
const CONF_SSH_IMG_FNAME: &str = "config_ssh_disk.img.zst";

Expand Down Expand Up @@ -304,6 +304,7 @@ fn setup_ssh(env: &TestEnv, config_dir: PathBuf) -> Result<()> {
Ok(())
}

#[derive(Debug)]
pub struct DeployedUniversalVm {
env: TestEnv,
name: String,
Expand Down
Loading

0 comments on commit 7742d96

Please sign in to comment.