Skip to content

Commit

Permalink
Allow clients to define custom callbacks to handle telemetry (#1080)
Browse files Browse the repository at this point in the history
## Description of change
Different users of mountpoint will care about different metrics returned
for each requests, so allow them to define their own custom handlers for
the on_telemetry callback in addition to the default metrics that
mountpoint emits.

This allows users to do things like: 
- emit extended request ids ("x-amz-id-2")
- When some criteria is met, log out additional information

Relevant issues: #1079 

## Does this change impact existing behavior?


No there should be no breaking changes, the only visible change is that
there's a new field to the S3ClientConfig which defines the custom
telemetry handler

## Does this change need a changelog entry in any of the crates?

Just a note in mountpoint-s3-client letting users know this feature now
exists

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Signed-off-by: Hans Pistor <[email protected]>
Signed-off-by: aws-hans-pistor <[email protected]>
Co-authored-by: Volodkin Vladislav <[email protected]>
  • Loading branch information
aws-hans-pistor and vladem authored Jan 13, 2025
1 parent 19d06e2 commit ab77aaa
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 3 deletions.
4 changes: 3 additions & 1 deletion mountpoint-s3-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ pub mod error_metadata;

pub use object_client::{ObjectClient, PutObjectRequest};

pub use s3_crt_client::{get_object::S3GetObjectResponse, put_object::S3PutObjectRequest, S3CrtClient, S3RequestError};
pub use s3_crt_client::{
get_object::S3GetObjectResponse, put_object::S3PutObjectRequest, OnTelemetry, S3CrtClient, S3RequestError,
};

/// Configuration for the S3 client
pub mod config {
Expand Down
21 changes: 21 additions & 0 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ pub struct S3ClientConfig {
read_backpressure: bool,
initial_read_window: usize,
network_interface_names: Vec<String>,
telemetry_callback: Option<Arc<dyn OnTelemetry>>,
}

impl Default for S3ClientConfig {
Expand All @@ -120,6 +121,7 @@ impl Default for S3ClientConfig {
read_backpressure: false,
initial_read_window: DEFAULT_PART_SIZE,
network_interface_names: vec![],
telemetry_callback: None,
}
}
}
Expand Down Expand Up @@ -221,6 +223,13 @@ impl S3ClientConfig {
self.network_interface_names = network_interface_names;
self
}

/// Set a custom telemetry callback handler
#[must_use = "S3ClientConfig follows a builder pattern"]
pub fn telemetry_callback(mut self, telemetry_callback: Arc<dyn OnTelemetry>) -> Self {
self.telemetry_callback = Some(telemetry_callback);
self
}
}

/// Authentication configuration for the CRT-based S3 client
Expand Down Expand Up @@ -288,6 +297,7 @@ struct S3CrtClientInner {
bucket_owner: Option<String>,
credentials_provider: Option<CredentialsProvider>,
host_resolver: HostResolver,
telemetry_callback: Option<Arc<dyn OnTelemetry>>,
}

impl S3CrtClientInner {
Expand Down Expand Up @@ -422,6 +432,7 @@ impl S3CrtClientInner {
bucket_owner: config.bucket_owner,
credentials_provider: Some(credentials_provider),
host_resolver,
telemetry_callback: config.telemetry_callback,
})
}

Expand Down Expand Up @@ -551,6 +562,7 @@ impl S3CrtClientInner {
let endpoint = options.get_endpoint().expect("S3Message always has an endpoint");
let hostname = endpoint.host_name().to_str().unwrap().to_owned();
let host_resolver = self.host_resolver.clone();
let telemetry_callback = self.telemetry_callback.clone();

let start_time = Instant::now();
let first_body_part = Arc::new(AtomicBool::new(true));
Expand Down Expand Up @@ -595,6 +607,10 @@ impl S3CrtClientInner {
} else if request_canceled {
metrics::counter!("s3.requests.canceled", "op" => op, "type" => request_type).increment(1);
}

if let Some(telemetry_callback) = &telemetry_callback {
telemetry_callback.on_telemetry(metrics);
}
})
.on_headers(move |headers, response_status| {
(on_headers)(headers, response_status);
Expand Down Expand Up @@ -1370,6 +1386,11 @@ impl ObjectClient for S3CrtClient {
}
}

/// Custom handling of telemetry events
pub trait OnTelemetry: std::fmt::Debug + Send + Sync {
fn on_telemetry(&self, request_metrics: &RequestMetrics);
}

#[cfg(test)]
mod tests {
use mountpoint_s3_crt::common::error::Error;
Expand Down
10 changes: 9 additions & 1 deletion mountpoint-s3-client/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use bytes::Bytes;
use futures::{pin_mut, Stream, StreamExt};
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::types::{ClientBackpressureHandle, GetObjectResponse};
use mountpoint_s3_client::S3CrtClient;
use mountpoint_s3_client::{OnTelemetry, S3CrtClient};
use mountpoint_s3_crt::common::allocator::Allocator;
use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter;
use mountpoint_s3_crt::common::uri::Uri;
use rand::rngs::OsRng;
use rand::RngCore;
use std::ops::Range;
use std::sync::Arc;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt as _;
use tracing_subscriber::{EnvFilter, Layer};
Expand Down Expand Up @@ -87,6 +88,13 @@ pub fn get_test_backpressure_client(initial_read_window: usize, part_size: Optio
S3CrtClient::new(config).expect("could not create test client")
}

pub fn get_test_client_with_custom_telemetry(telemetry_callback: Arc<dyn OnTelemetry>) -> S3CrtClient {
let config = S3ClientConfig::new()
.endpoint_config(get_test_endpoint_config())
.telemetry_callback(telemetry_callback);
S3CrtClient::new(config).expect("could not create test client")
}

pub fn get_test_bucket_and_prefix(test_name: &str) -> (String, String) {
let bucket = get_test_bucket();
let prefix = get_unique_test_prefix(test_name);
Expand Down
73 changes: 72 additions & 1 deletion mountpoint-s3-client/tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use metrics::{
};
use mountpoint_s3_client::error::ObjectClientError;
use mountpoint_s3_client::types::{GetObjectParams, HeadObjectParams};
use mountpoint_s3_client::{ObjectClient, S3CrtClient, S3RequestError};
use mountpoint_s3_client::{ObjectClient, OnTelemetry, S3CrtClient, S3RequestError};
use mountpoint_s3_crt::s3::client::RequestMetrics;
use regex::Regex;
use rusty_fork::rusty_fork_test;
use tracing::Level;
Expand Down Expand Up @@ -280,3 +281,73 @@ rusty_fork_test! {
runtime.block_on(test_head_object_403());
}
}

async fn test_custom_telemetry_callback() {
let sdk_client = get_test_sdk_client().await;
let (bucket, prefix) = get_test_bucket_and_prefix("test_custom_telemetry_callback");

let key = format!("{prefix}/test");
let body = vec![0x42; 100];
sdk_client
.put_object()
.bucket(&bucket)
.key(&key)
.body(ByteStream::from(body.clone()))
.send()
.await
.unwrap();

let recorder = TestRecorder::default();
metrics::set_global_recorder(recorder.clone()).unwrap();

#[derive(Debug)]
struct CustomOnTelemetry {
metric_name: String,
}

impl OnTelemetry for CustomOnTelemetry {
fn on_telemetry(&self, request_metrics: &RequestMetrics) {
metrics::counter!(self.metric_name.clone()).absolute(request_metrics.total_duration().as_micros() as u64);
}
}

let request_duration_metric_name = "request_duration_us";

let custom_telemetry_callback = CustomOnTelemetry {
metric_name: String::from(request_duration_metric_name),
};

let client = get_test_client_with_custom_telemetry(Arc::new(custom_telemetry_callback));
let result = client
.get_object(&bucket, &key, &GetObjectParams::new())
.await
.expect("get_object should succeed");
let result = result
.map_ok(|(_offset, bytes)| bytes.len())
.try_fold(0, |a, b| async move { Ok(a + b) })
.await
.expect("get_object should succeed");
assert_eq!(result, body.len());

let metrics = recorder.metrics.lock().unwrap().clone();

let (_, request_duration_us) = metrics
.get(request_duration_metric_name, None, None)
.expect("The custom metric should be emitted");

let Metric::Counter(request_duration_us) = request_duration_us else {
panic!("Expected a counter metric")
};
assert!(
*request_duration_us.lock().unwrap() > 0,
"The request duration should be more than 0 microseconds"
);
}

rusty_fork_test! {
#[test]
fn custom_telemetry_callbacks_are_called() {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
runtime.block_on(test_custom_telemetry_callback());
}
}

0 comments on commit ab77aaa

Please sign in to comment.