From 385eb30ee4aa7d2952cdce8e5a2f94ab90841ead Mon Sep 17 00:00:00 2001 From: James Bornholt Date: Wed, 28 Feb 2024 01:43:04 +0000 Subject: [PATCH] Add request ID to meta request failures and add tests A side effect of https://github.com/awslabs/mountpoint-s3/pull/669 was that there's now no way to get request IDs for failed requests at the default logging settings, as only DEBUG-level messages include the request IDs. This change adds request IDs to the meta request failure message when available, so that these WARN-level messages still include request IDs. I also added some new infrastructure to test metrics and log messages. For metrics, we build a new `metrics::Recorder` that collects all the metrics and can then be searched to find them. For log messages, we build a `tracing_subscriber::Layer` that collects all tracing events emitted while enabled. In both cases, the new objects aren't thread safe, as both `Recorder`s and `Layer`s are global state. So these tests need to continue to use `rusty_fork` to split into a new process per test. Signed-off-by: James Bornholt --- Cargo.lock | 1 + mountpoint-s3-client/Cargo.toml | 1 + mountpoint-s3-client/src/s3_crt_client.rs | 28 +- mountpoint-s3-client/tests/common/mod.rs | 11 +- .../tests/common/tracing_test.rs | 84 ++++++ mountpoint-s3-client/tests/metrics.rs | 273 ++++++++++++++++++ 6 files changed, 387 insertions(+), 11 deletions(-) create mode 100644 mountpoint-s3-client/tests/common/tracing_test.rs create mode 100644 mountpoint-s3-client/tests/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 86a03a74b..8325a2303 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2000,6 +2000,7 @@ dependencies = [ "metrics", "mountpoint-s3-client", "mountpoint-s3-crt", + "mountpoint-s3-crt-sys", "once_cell", "percent-encoding", "pin-project", diff --git a/mountpoint-s3-client/Cargo.toml b/mountpoint-s3-client/Cargo.toml index 1affcf6d2..5fe55217f 100644 --- a/mountpoint-s3-client/Cargo.toml +++ b/mountpoint-s3-client/Cargo.toml @@ -38,6 +38,7 @@ rand = { version = "0.8.5", optional = true } rand_chacha = { version = "0.3.1", optional = true } [dev-dependencies] +mountpoint-s3-crt-sys = { path = "../mountpoint-s3-crt-sys" } anyhow = { version = "1.0.64", features = ["backtrace"] } aws-config = "1.1.4" aws-credential-types = "1.1.4" diff --git a/mountpoint-s3-client/src/s3_crt_client.rs b/mountpoint-s3-client/src/s3_crt_client.rs index b89ff5da8..0819a8465 100644 --- a/mountpoint-s3-client/src/s3_crt_client.rs +++ b/mountpoint-s3-client/src/s3_crt_client.rs @@ -464,12 +464,12 @@ impl S3CrtClientInner { let range = metrics.response_headers().and_then(|headers| extract_range_header(&headers)); let message = if request_failure { - "CRT request failed" + "S3 request failed" } else { - "CRT request finished" + "S3 request finished" }; debug!(%request_type, ?crt_error, http_status, ?range, ?duration, ?ttfb, %request_id, "{}", message); - trace!(detailed_metrics=?metrics, "CRT request completed"); + trace!(detailed_metrics=?metrics, "S3 request completed"); let op = span_telemetry.metadata().map(|m| m.name()).unwrap_or("unknown"); if let Some(ttfb) = ttfb { @@ -535,11 +535,20 @@ impl S3CrtClientInner { Ok(t) } Err(maybe_err) => { + // Try to parse request header out of the failure. We can't just use the + // telemetry callback because there might be multiple requests per meta + // request, but these headers are known to be from the failed request. + let request_id = match &request_result.error_response_headers { + Some(headers) => headers.get("x-amz-request-id").map(|s| s.value().to_string_lossy().to_string()).ok(), + None => None, + }; + let request_id = request_id.unwrap_or_else(|| "".into()); + if let Some(error) = &maybe_err { - event!(log_level, ?duration, ?error, "meta request failed"); + event!(log_level, ?duration, %request_id, ?error, "meta request failed"); debug!("failed meta request result: {:?}", request_result); } else { - event!(log_level, ?duration, ?request_result, "meta request failed"); + event!(log_level, ?duration, %request_id, ?request_result, "meta request failed"); } // If it's not a real HTTP status, encode the CRT error in the metric instead @@ -1237,8 +1246,8 @@ mod tests { #[test] fn parse_no_signing_credential_error() { - // 6146 is crt error code for AWS_AUTH_SIGNING_NO_CREDENTIALS - let result = make_crt_error_result(0, 6146.into()); + let error_code = mountpoint_s3_crt_sys::aws_auth_errors::AWS_AUTH_SIGNING_NO_CREDENTIALS as i32; + let result = make_crt_error_result(0, error_code.into()); let result = try_parse_generic_error(&result); let Some(S3RequestError::NoSigningCredentials) = result else { panic!("wrong result, got: {:?}", result); @@ -1247,9 +1256,8 @@ mod tests { #[test] fn parse_test_other_crt_error() { - // 6144 is crt error code for AWS_AUTH_SIGNING_UNSUPPORTED_ALGORITHM, which is another signing error, - // but not no signing credential error - let error_code = 6144; + // A signing error that isn't "no signing credentials" + let error_code = mountpoint_s3_crt_sys::aws_auth_errors::AWS_AUTH_SIGNING_UNSUPPORTED_ALGORITHM as i32; let result = make_crt_error_result(0, error_code.into()); let result = try_parse_generic_error(&result); let Some(S3RequestError::CrtError(error)) = result else { diff --git a/mountpoint-s3-client/tests/common/mod.rs b/mountpoint-s3-client/tests/common/mod.rs index 313574f16..cd51250c0 100644 --- a/mountpoint-s3-client/tests/common/mod.rs +++ b/mountpoint-s3-client/tests/common/mod.rs @@ -15,12 +15,21 @@ use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter; use rand::rngs::OsRng; use rand::RngCore; use std::ops::Range; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt as _; +use tracing_subscriber::{EnvFilter, Layer}; + +pub mod tracing_test; /// Enable tracing and CRT logging when running unit tests. #[ctor::ctor] fn init_tracing_subscriber() { let _ = RustLogAdapter::try_init(); - let _ = tracing_subscriber::fmt::try_init(); + + let subscriber = tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env())) + .with(self::tracing_test::TracingTestLayer::get()); + let _ = subscriber.try_init(); } pub enum AccessPointType { diff --git a/mountpoint-s3-client/tests/common/tracing_test.rs b/mountpoint-s3-client/tests/common/tracing_test.rs new file mode 100644 index 000000000..feb4e23cd --- /dev/null +++ b/mountpoint-s3-client/tests/common/tracing_test.rs @@ -0,0 +1,84 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; + +use once_cell::sync::Lazy; +use tracing::{Event, Level, Subscriber}; +use tracing_subscriber::field::VisitOutput; +use tracing_subscriber::fmt::format::{DefaultVisitor, Writer}; +use tracing_subscriber::layer::Context; +use tracing_subscriber::Layer; + +static TRACING_TEST_LAYER: Lazy = Lazy::new(TracingTestLayer::new); + +/// This is a singleton [tracing::Layer] that can be used to write tests for log events. +/// +/// Use it like this: +/// ```rust +/// let _guard = TracingTestLayer::enable(); +/// // ... do work that emits tracing events ... +/// drop(_guard); +/// let events = TracingTestLayer::take_events(); +/// // events is a list of all events emitted +/// ``` +/// +/// THIS IS NOT THREAD SAFE! tracing doesn't give us a good way to separate threads, as tracing +/// subscribers are global state. You almost certainly want to use [rusty_fork] to write tests using +/// this layer. +#[derive(Debug, Default, Clone)] +pub struct TracingTestLayer { + inner: Arc, +} + +#[derive(Debug, Default)] +struct Inner { + events: Mutex>, + enabled: AtomicBool, +} + +impl TracingTestLayer { + fn new() -> Self { + Self { + inner: Arc::new(Inner { + events: Mutex::new(Vec::new()), + enabled: AtomicBool::new(false), + }), + } + } + + /// Get a handle to the singleton layer + pub fn get() -> Self { + TRACING_TEST_LAYER.clone() + } + + /// Start collecting tracing events, and stop collecting them when the returned guard drops. + #[must_use = "returns a guard that disables tracing when dropped"] + pub fn enable() -> TracingTestLayerEnableGuard { + TRACING_TEST_LAYER.inner.enabled.store(true, Ordering::SeqCst); + TracingTestLayerEnableGuard {} + } + + /// Take all the collected events + pub fn take_events() -> Vec<(Level, String)> { + TRACING_TEST_LAYER.inner.events.lock().unwrap().drain(..).collect() + } +} + +impl Layer for TracingTestLayer { + fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { + if self.inner.enabled.load(Ordering::SeqCst) { + let mut msg = String::new(); + let writer = Writer::new(&mut msg); + let visitor = DefaultVisitor::new(writer, true); + visitor.visit(event).unwrap(); + self.inner.events.lock().unwrap().push((*event.metadata().level(), msg)); + } + } +} + +pub struct TracingTestLayerEnableGuard; + +impl Drop for TracingTestLayerEnableGuard { + fn drop(&mut self) { + TRACING_TEST_LAYER.inner.enabled.store(false, Ordering::SeqCst); + } +} diff --git a/mountpoint-s3-client/tests/metrics.rs b/mountpoint-s3-client/tests/metrics.rs new file mode 100644 index 000000000..a8dbba65a --- /dev/null +++ b/mountpoint-s3-client/tests/metrics.rs @@ -0,0 +1,273 @@ +#![cfg(feature = "s3_tests")] + +//! Tests for metrics emitted by the client. +//! +//! Metrics tests are a bit annoying because metrics are emitted by different threads, and so the +//! [metrics] crate's thread-local recorders can't be used for testing. So our tests instead need to +//! fork (with [rusty_fork] so they can install a global recorder without interfering with each +//! other. + +pub mod common; + +use std::collections::HashMap; +use std::option::Option::None; +use std::sync::{Arc, Mutex}; + +use aws_sdk_s3::primitives::ByteStream; +use common::*; +use futures::TryStreamExt; +use metrics::{ + Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn, Key, KeyName, Metadata, Recorder, SharedString, Unit, +}; +use mountpoint_s3_client::error::ObjectClientError; +use mountpoint_s3_client::{ObjectClient, S3CrtClient, S3RequestError}; +use regex::Regex; +use rusty_fork::rusty_fork_test; +use tracing::Level; + +use crate::tracing_test::TracingTestLayer; + +/// A test metrics recorder that just remembers the current values of gauges and counters, and all +/// inserted values for histograms. +#[derive(Debug, Default, Clone)] +struct TestRecorder { + metrics: Arc>, +} + +#[derive(Debug, Default, Clone)] +struct Metrics { + metrics: HashMap>, +} + +impl Metrics { + fn get_all(&self, name: &str) -> impl Iterator)> { + let name = name.to_owned(); + self.metrics.iter().filter(move |(key, _)| key.name() == name) + } + + fn get_for_label(&self, name: &str, key: &str, value: &str) -> Option<&Metric> { + self.metrics + .iter() + .find(move |(k, _)| { + let label = k.labels().find(|label| label.key() == key && label.value() == value); + k.name() == name && label.is_some() + }) + .map(|(_, metric)| metric.as_ref()) + } +} + +impl Recorder for TestRecorder { + fn describe_counter(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + + fn describe_gauge(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + + fn describe_histogram(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + + fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter { + let mut metrics = self.metrics.lock().unwrap(); + let metric = metrics + .metrics + .entry(key.clone()) + .or_insert(Arc::new(Metric::Counter(Default::default()))); + Counter::from_arc(metric.clone()) + } + + fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge { + let mut metrics = self.metrics.lock().unwrap(); + let metric = metrics + .metrics + .entry(key.clone()) + .or_insert(Arc::new(Metric::Gauge(Default::default()))); + Gauge::from_arc(metric.clone()) + } + + fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram { + let mut metrics = self.metrics.lock().unwrap(); + let metric = metrics + .metrics + .entry(key.clone()) + .or_insert(Arc::new(Metric::Histogram(Default::default()))); + Histogram::from_arc(metric.clone()) + } +} + +#[derive(Debug)] +enum Metric { + Histogram(Mutex>), + Counter(Mutex), + Gauge(Mutex), +} + +impl CounterFn for Metric { + fn increment(&self, value: u64) { + let Metric::Counter(counter) = self else { + panic!("expected counter"); + }; + *counter.lock().unwrap() += value; + } + + fn absolute(&self, value: u64) { + let Metric::Counter(counter) = self else { + panic!("expected counter"); + }; + *counter.lock().unwrap() = value; + } +} + +impl GaugeFn for Metric { + fn increment(&self, value: f64) { + let Metric::Gauge(gauge) = self else { + panic!("expected gauge"); + }; + *gauge.lock().unwrap() += value; + } + + fn decrement(&self, value: f64) { + let Metric::Gauge(gauge) = self else { + panic!("expected gauge"); + }; + *gauge.lock().unwrap() -= value; + } + + fn set(&self, value: f64) { + let Metric::Gauge(gauge) = self else { + panic!("expected gauge"); + }; + *gauge.lock().unwrap() = value; + } +} + +impl HistogramFn for Metric { + fn record(&self, value: f64) { + let Metric::Histogram(histogram) = self else { + panic!("expected histogram"); + }; + histogram.lock().unwrap().push(value); + } +} + +/// Test basic metrics emitted by get_object +async fn test_get_object_metrics() { + let sdk_client = get_test_sdk_client().await; + let (bucket, prefix) = get_test_bucket_and_prefix("test_get_object_metrics"); + + let key = format!("{prefix}/test"); + let body = vec![0x42; 100]; + sdk_client + .put_object() + .bucket(&bucket) + .key(&key) + .body(ByteStream::from(body.clone())) + .send() + .await + .unwrap(); + + let recorder = TestRecorder::default(); + metrics::set_global_recorder(recorder.clone()).unwrap(); + + let client: S3CrtClient = get_test_client(); + let result = client + .get_object(&bucket, &key, None, None) + .await + .expect("get_object should succeed"); + let result = result + .map_ok(|(_offset, bytes)| bytes.len()) + .try_fold(0, |a, b| async move { Ok(a + b) }) + .await + .expect("get_object should succeed"); + assert_eq!(result, body.len()); + + let metrics = recorder.metrics.lock().unwrap().clone(); + + // Host count metric is emitted for a host that ends in amazonaws.com + let (key, host_count) = metrics + .get_all("s3.client.host_count") + .next() + .expect("host count metric should exist"); + let host = key + .labels() + .find(|l| l.key() == "host") + .expect("host label should exist"); + // TODO: this assertion won't work in other partitions + assert!(host.value().ends_with(".amazonaws.com")); + let Metric::Gauge(host_count) = host_count.as_ref() else { + panic!("expected gauge for host count metric"); + }; + assert!(*host_count.lock().unwrap() > 0.0); + + // Latency metrics should exist for get_object + let ttfb = metrics + .get_for_label("s3.requests.first_byte_latency_us", "op", "get_object") + .expect("first byte latency should exist"); + let Metric::Histogram(ttfb) = ttfb else { + panic!("expected histogram for first byte latency"); + }; + assert!(ttfb.lock().unwrap().len() > 0); + let total = metrics + .get_for_label("s3.requests.total_latency_us", "op", "get_object") + .expect("total latency should exist"); + let Metric::Histogram(total) = total else { + panic!("expected histogram for total latency"); + }; + assert!(total.lock().unwrap().len() > 0); +} + +rusty_fork_test! { + #[test] + fn get_object_metrics() { + let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); + runtime.block_on(test_get_object_metrics()); + } +} + +/// Test metrics and log messages for a head object that gets a 403 error +async fn test_head_object_403() { + let bucket = get_test_bucket_without_permissions(); + + let recorder = TestRecorder::default(); + metrics::set_global_recorder(recorder.clone()).unwrap(); + let _guard = TracingTestLayer::enable(); + + let client: S3CrtClient = get_test_client(); + let err = client + .head_object(&bucket, "some-key") + .await + .expect_err("head to no-permissions bucket should fail"); + assert!(matches!( + err, + ObjectClientError::ClientError(S3RequestError::Forbidden(_)) + )); + + drop(_guard); + let metrics = recorder.metrics.lock().unwrap().clone(); + + // WARN-level message with the failure and the request ID is emitted + let events = TracingTestLayer::take_events(); + // Rather than hard-coding a request ID format, just look for anything that seems long enough + // and doesn't contain `<` (which we use for "unknown") + let request_id = Regex::new(r"request_id=[^\s<]{10}").unwrap(); + events + .iter() + .find(|(level, message)| { + // Higher levels are higher verbosity, so ERROR is the lowest level + *level <= Level::WARN && message.contains("meta request failed") && request_id.is_match(message) + }) + .expect("request ID message not found"); + + // Failures metric is incremented + let failures = metrics + .get_for_label("s3.meta_requests.failures", "op", "head_object") + .expect("failures metric should exist"); + let Metric::Counter(failures) = failures else { + panic!("expected counter for failures metric"); + }; + assert!(*failures.lock().unwrap() > 0); +} + +rusty_fork_test! { + #[test] + fn head_object_403() { + let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); + runtime.block_on(test_head_object_403()); + } +}