From 9326a48911c54f2ebe4c1e06f4ba3e52ebcbf03c Mon Sep 17 00:00:00 2001 From: James Bornholt Date: Tue, 27 Feb 2024 11:54:04 -0600 Subject: [PATCH] Update to new `metrics` crate version (#787) v0.22.0 of the metrics crate was a breaking change to how its macros work -- they now return the counter/gauge/histogram itself and you call methods on it to record metrics, rather than recording the metric as part of the macro. So this change is mostly a find and replace to get things compiling again with this new change. Other than that change, there's two new things we'd like to use from this new version: 1. Scoped local metrics recorders are now supported, which makes it much easier to write unit tests for metrics. The metrics recorders were previously global, so tests had to use `rusty_fork` to fork a new test process. I've used this change to update the current basic metrics tests. 2. Metrics now include metadata such as severity and module/line location. We're not using this yet, but could use it in the future to create scoped metrics or different metric severities for our logging use. Signed-off-by: James Bornholt --- Cargo.lock | 17 +---- mountpoint-s3-client/Cargo.toml | 2 +- mountpoint-s3-client/src/s3_crt_client.rs | 69 +++++++------------ mountpoint-s3/Cargo.toml | 3 +- .../src/data_cache/disk_data_cache.rs | 25 +++---- mountpoint-s3/src/fs.rs | 8 +-- mountpoint-s3/src/fuse.rs | 18 ++--- mountpoint-s3/src/inode.rs | 16 ++--- mountpoint-s3/src/inode/negative_cache.rs | 24 +++---- mountpoint-s3/src/metrics.rs | 64 ++++++++--------- mountpoint-s3/src/metrics/tracing_span.rs | 2 +- mountpoint-s3/src/prefetch.rs | 12 ++-- mountpoint-s3/src/prefetch/caching_stream.rs | 8 +-- mountpoint-s3/src/prefetch/part_queue.rs | 2 +- 14 files changed, 109 insertions(+), 161 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a15193888..86a03a74b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1879,26 +1879,14 @@ dependencies = [ [[package]] name = "metrics" -version = "0.21.1" +version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" +checksum = "cd71d9db2e4287c3407fa04378b8c2ee570aebe0854431562cdd89ca091854f4" dependencies = [ "ahash", - "metrics-macros", "portable-atomic", ] -[[package]] -name = "metrics-macros" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38b4faf00617defe497754acde3024865bc143d44a86799b24e191ecff91354f" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.41", -] - [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1967,7 +1955,6 @@ dependencies = [ "rand", "rand_chacha", "regex", - "rusty-fork", "serde", "serde_json", "serial_test", diff --git a/mountpoint-s3-client/Cargo.toml b/mountpoint-s3-client/Cargo.toml index 4d8ffe097..1affcf6d2 100644 --- a/mountpoint-s3-client/Cargo.toml +++ b/mountpoint-s3-client/Cargo.toml @@ -17,7 +17,7 @@ const_format = "0.2.30" futures = "0.3.24" lazy_static = "1.4.0" libc = "0.2.126" -metrics = "0.21.1" +metrics = "0.22.1" once_cell = "1.16.0" percent-encoding = "2.2.0" pin-project = "1.0.12" diff --git a/mountpoint-s3-client/src/s3_crt_client.rs b/mountpoint-s3-client/src/s3_crt_client.rs index b04054143..b89ff5da8 100644 --- a/mountpoint-s3-client/src/s3_crt_client.rs +++ b/mountpoint-s3-client/src/s3_crt_client.rs @@ -473,12 +473,12 @@ impl S3CrtClientInner { let op = span_telemetry.metadata().map(|m| m.name()).unwrap_or("unknown"); if let Some(ttfb) = ttfb { - metrics::histogram!("s3.requests.first_byte_latency_us", ttfb.as_micros() as f64, "op" => op, "type" => request_type); + metrics::histogram!("s3.requests.first_byte_latency_us", "op" => op, "type" => request_type).record(ttfb.as_micros() as f64); } - metrics::histogram!("s3.requests.total_latency_us", duration.as_micros() as f64, "op" => op, "type" => request_type); - metrics::counter!("s3.requests", 1, "op" => op, "type" => request_type); + metrics::histogram!("s3.requests.total_latency_us", "op" => op, "type" => request_type).record(duration.as_micros() as f64); + metrics::counter!("s3.requests", "op" => op, "type" => request_type).increment(1); if request_failure { - metrics::counter!("s3.requests.failures", 1, "op" => op, "type" => request_type, "status" => http_status.unwrap_or(-1).to_string()); + metrics::counter!("s3.requests.failures", "op" => op, "type" => request_type, "status" => http_status.unwrap_or(-1).to_string()).increment(1); } }) .on_headers(move |headers, response_status| { @@ -490,7 +490,7 @@ impl S3CrtClientInner { if first_body_part.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) { let latency = start_time.elapsed().as_micros() as f64; let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown"); - metrics::histogram!("s3.meta_requests.first_byte_latency_us", latency, "op" => op); + metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency); } total_bytes.fetch_add(data.len() as u64, Ordering::SeqCst); @@ -504,12 +504,12 @@ impl S3CrtClientInner { let op = span_finish.metadata().map(|m| m.name()).unwrap_or("unknown"); let duration = start_time.elapsed(); - metrics::counter!("s3.meta_requests", 1, "op" => op); - metrics::histogram!("s3.meta_requests.total_latency_us", duration.as_micros() as f64, "op" => op); + metrics::counter!("s3.meta_requests", "op" => op).increment(1); + metrics::histogram!("s3.meta_requests.total_latency_us", "op" => op).record(duration.as_micros() as f64); // Some HTTP requests (like HEAD) don't have a body to stream back, so calculate TTFB now if first_body_part_clone.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) { let latency = duration.as_micros() as f64; - metrics::histogram!("s3.meta_requests.first_byte_latency_us", latency, "op" => op); + metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency); } let total_bytes = total_bytes_clone.load(Ordering::SeqCst); // We only log throughput of object data. PUT needs to be measured in its stream @@ -519,7 +519,7 @@ impl S3CrtClientInner { } let hostname_awsstring = AwsString::from_str(&hostname, &Allocator::default()); if let Ok(host_count) = host_resolver.get_host_address_count(&hostname_awsstring, AddressKinds::a()) { - metrics::absolute_counter!("s3.client.host_count", host_count as u64, "host" => hostname); + metrics::gauge!("s3.client.host_count", "host" => hostname).set(host_count as f64); } let log_level = status_code_to_log_level(request_result.response_status); @@ -548,7 +548,7 @@ impl S3CrtClientInner { } else { -request_result.crt_error.raw_error() }; - metrics::counter!("s3.meta_requests.failures", 1, "op" => op, "status" => format!("{error_status}")); + metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1); // Fill in a generic error if we weren't able to parse one Err(maybe_err.unwrap_or_else(|| ObjectClientError::ClientError(S3RequestError::ResponseError(request_result)))) @@ -618,40 +618,19 @@ impl S3CrtClientInner { fn poll_client_metrics(s3_client: &Client) { let metrics = s3_client.poll_client_metrics(); - metrics::gauge!( - "s3.client.num_requests_being_processed", - metrics.num_requests_tracked_requests as f64 - ); - metrics::gauge!( - "s3.client.num_requests_being_prepared", - metrics.num_requests_being_prepared as f64 - ); - metrics::gauge!("s3.client.request_queue_size", metrics.request_queue_size as f64); - metrics::gauge!( - "s3.client.num_auto_default_network_io", - metrics.num_auto_default_network_io as f64 - ); - metrics::gauge!( - "s3.client.num_auto_ranged_get_network_io", - metrics.num_auto_ranged_get_network_io as f64 - ); - metrics::gauge!( - "s3.client.num_auto_ranged_put_network_io", - metrics.num_auto_ranged_put_network_io as f64 - ); - metrics::gauge!( - "s3.client.num_auto_ranged_copy_network_io", - metrics.num_auto_ranged_copy_network_io as f64 - ); - metrics::gauge!("s3.client.num_total_network_io", metrics.num_total_network_io() as f64); - metrics::gauge!( - "s3.client.num_requests_stream_queued_waiting", - metrics.num_requests_stream_queued_waiting as f64 - ); - metrics::gauge!( - "s3.client.num_requests_streaming_response", - metrics.num_requests_streaming_response as f64 - ); + metrics::gauge!("s3.client.num_requests_being_processed").set(metrics.num_requests_tracked_requests as f64); + metrics::gauge!("s3.client.num_requests_being_prepared").set(metrics.num_requests_being_prepared as f64); + metrics::gauge!("s3.client.request_queue_size").set(metrics.request_queue_size as f64); + metrics::gauge!("s3.client.num_auto_default_network_io").set(metrics.num_auto_default_network_io as f64); + metrics::gauge!("s3.client.num_auto_ranged_get_network_io").set(metrics.num_auto_ranged_get_network_io as f64); + metrics::gauge!("s3.client.num_auto_ranged_put_network_io").set(metrics.num_auto_ranged_put_network_io as f64); + metrics::gauge!("s3.client.num_auto_ranged_copy_network_io") + .set(metrics.num_auto_ranged_copy_network_io as f64); + metrics::gauge!("s3.client.num_total_network_io").set(metrics.num_total_network_io() as f64); + metrics::gauge!("s3.client.num_requests_stream_queued_waiting") + .set(metrics.num_requests_stream_queued_waiting as f64); + metrics::gauge!("s3.client.num_requests_streaming_response") + .set(metrics.num_requests_streaming_response as f64); } fn next_request_counter(&self) -> u64 { @@ -959,7 +938,7 @@ fn emit_throughput_metric(bytes: u64, duration: Duration, op: &'static str) { } else { ">16MiB" }; - metrics::histogram!("s3.meta_requests.throughput_mibs", throughput_mbps, "op" => op, "size" => bucket); + metrics::histogram!("s3.meta_requests.throughput_mibs", "op" => op, "size" => bucket).record(throughput_mbps); } #[cfg_attr(not(docs_rs), async_trait)] diff --git a/mountpoint-s3/Cargo.toml b/mountpoint-s3/Cargo.toml index 84d2ef787..c97c5a112 100644 --- a/mountpoint-s3/Cargo.toml +++ b/mountpoint-s3/Cargo.toml @@ -28,7 +28,7 @@ hex = "0.4.3" lazy_static = "1.4.0" libc = "0.2.126" linked-hash-map = "0.5.6" -metrics = "0.21.1" +metrics = "0.22.1" nix = { version = "0.27.1", features = ["user"] } regex = "1.7.1" serde = { version = "1.0.190", features = ["derive"] } @@ -62,7 +62,6 @@ proptest = "1.4.0" proptest-derive = "0.4.0" rand = "0.8.5" rand_chacha = "0.3.1" -rusty-fork = "0.3.0" serial_test = "2.0.0" sha2 = "0.10.6" shuttle = { version = "0.5.0" } diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 237ce4fef..9799c32d2 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -366,14 +366,14 @@ impl DataCache for DiskDataCache { match self.read_block(&path, cache_key, block_idx, block_offset) { Ok(None) => { // Cache miss. - metrics::counter!("disk_data_cache.block_hit", 0); + metrics::counter!("disk_data_cache.block_hit").increment(0); Ok(None) } Ok(Some(bytes)) => { // Cache hit. - metrics::counter!("disk_data_cache.block_hit", 1); - metrics::counter!("disk_data_cache.total_bytes", bytes.len() as u64, "type" => "read"); - metrics::histogram!("disk_data_cache.read_duration_us", start.elapsed().as_micros() as f64); + metrics::counter!("disk_data_cache.block_hit").increment(1); + metrics::counter!("disk_data_cache.total_bytes", "type" => "read").increment(bytes.len() as u64); + metrics::histogram!("disk_data_cache.read_duration_us").record(start.elapsed().as_micros() as f64); if let Some(usage) = &self.usage { usage.lock().unwrap().refresh(&block_key); } @@ -381,8 +381,8 @@ impl DataCache for DiskDataCache { } Err(err) => { // Invalid block. Count as cache miss. - metrics::counter!("disk_data_cache.block_hit", 0); - metrics::counter!("disk_data_cache.block_err", 1); + metrics::counter!("disk_data_cache.block_hit").increment(0); + metrics::counter!("disk_data_cache.block_err").increment(1); match fs::remove_file(&path) { Ok(()) => { if let Some(usage) = &self.usage { @@ -419,20 +419,15 @@ impl DataCache for DiskDataCache { { let eviction_start = Instant::now(); let result = self.evict_if_needed(); - metrics::histogram!( - "disk_data_cache.eviction_duration_us", - eviction_start.elapsed().as_micros() as f64 - ); + metrics::histogram!("disk_data_cache.eviction_duration_us") + .record(eviction_start.elapsed().as_micros() as f64); result }?; let write_start = Instant::now(); let size = self.write_block(path, block)?; - metrics::histogram!( - "disk_data_cache.write_duration_us", - write_start.elapsed().as_micros() as f64 - ); - metrics::counter!("disk_data_cache.total_bytes", bytes_len as u64, "type" => "write"); + metrics::histogram!("disk_data_cache.write_duration_us").record(write_start.elapsed().as_micros() as f64); + metrics::counter!("disk_data_cache.total_bytes", "type" => "write").increment(bytes_len as u64); if let Some(usage) = &self.usage { usage.lock().unwrap().add(block_key, size); } diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index 2ec642dc2..b66e5607e 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -117,7 +117,7 @@ where } Ok(request) => FileHandleState::Write(UploadState::InProgress { request, handle }), }; - metrics::increment_gauge!("fs.current_handles", 1.0, "type" => "write"); + metrics::gauge!("fs.current_handles", "type" => "write").increment(1.0); Ok(handle) } @@ -142,7 +142,7 @@ where .prefetcher .prefetch(fs.client.clone(), &fs.bucket, &full_key, object_size, etag.clone()); let handle = FileHandleState::Read(request); - metrics::increment_gauge!("fs.current_handles", 1.0, "type" => "read"); + metrics::gauge!("fs.current_handles", "type" => "read").increment(1.0); Ok(handle) } } @@ -1136,7 +1136,7 @@ where let request = match file_handle.state.into_inner() { FileHandleState::Read { .. } => { // TODO make sure we cancel the inflight PrefetchingGetRequest. is just dropping enough? - metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "read"); + metrics::gauge!("fs.current_handles", "type" => "read").decrement(1.0); file_handle.inode.finish_reading()?; return Ok(()); } @@ -1144,7 +1144,7 @@ where }; let result = request.complete_if_in_progress(&file_handle.full_key).await; - metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "write"); + metrics::gauge!("fs.current_handles", "type" => "write").decrement(1.0); // Errors won't actually be seen by the user because `release` is async, // but it's the right thing to do. result diff --git a/mountpoint-s3/src/fuse.rs b/mountpoint-s3/src/fuse.rs index f8d800359..9eadce513 100644 --- a/mountpoint-s3/src/fuse.rs +++ b/mountpoint-s3/src/fuse.rs @@ -40,7 +40,7 @@ macro_rules! fuse_error { ($name:literal, $reply:expr, $err:expr) => {{ let err = $err; event!(err.level, "{} failed: {:#}", $name, err); - ::metrics::counter!("fuse.op_failures", 1, "op" => $name); + ::metrics::counter!("fuse.op_failures", "op" => $name).increment(1); $reply.error(err.to_errno()); }}; } @@ -49,8 +49,8 @@ macro_rules! fuse_error { macro_rules! fuse_unsupported { ($name:literal, $reply:expr, $err:expr, $level:expr) => {{ event!($level, "{} failed: operation not supported by Mountpoint", $name); - ::metrics::counter!("fuse.op_failures", 1, "op" => $name); - ::metrics::counter!("fuse.op_unimplemented", 1, "op" => $name); + ::metrics::counter!("fuse.op_failures", "op" => $name).increment(1); + ::metrics::counter!("fuse.op_unimplemented","op" => $name).increment(1); $reply.error($err); }}; ($name:literal, $reply:expr) => { @@ -150,8 +150,8 @@ where Err(err) => fuse_error!("read", reply, err), } - metrics::counter!("fuse.total_bytes", bytes_sent as u64, "type" => "read"); - metrics::histogram!("fuse.io_size", bytes_sent as f64, "type" => "read"); + metrics::counter!("fuse.total_bytes", "type" => "read").increment(bytes_sent as u64); + metrics::histogram!("fuse.io_size", "type" => "read").record(bytes_sent as f64); } #[instrument(level="warn", skip_all, fields(req=_req.unique(), ino=parent, name=field::Empty))] @@ -188,7 +188,7 @@ where match block_on(self.fs.readdir(parent, fh, offset, replier).in_current_span()) { Ok(_) => { reply.ok(); - metrics::counter!("fuse.readdir.entries", count as u64); + metrics::counter!("fuse.readdir.entries").increment(count as u64); } Err(e) => fuse_error!("readdir", reply, e), } @@ -234,7 +234,7 @@ where match block_on(self.fs.readdirplus(parent, fh, offset, replier).in_current_span()) { Ok(_) => { reply.ok(); - metrics::counter!("fuse.readdirplus.entries", count as u64); + metrics::counter!("fuse.readdirplus.entries").increment(count as u64); } Err(e) => fuse_error!("readdirplus", reply, e), } @@ -332,8 +332,8 @@ where ) { Ok(bytes_written) => { reply.written(bytes_written); - metrics::counter!("fuse.total_bytes", bytes_written as u64, "type" => "write"); - metrics::histogram!("fuse.io_size", bytes_written as f64, "type" => "write"); + metrics::counter!("fuse.total_bytes", "type" => "write").increment(bytes_written as u64); + metrics::histogram!("fuse.io_size", "type" => "write").record(bytes_written as f64); } Err(e) => fuse_error!("write", reply, e), } diff --git a/mountpoint-s3/src/inode.rs b/mountpoint-s3/src/inode.rs index 7643cfa34..99c727dfb 100644 --- a/mountpoint-s3/src/inode.rs +++ b/mountpoint-s3/src/inode.rs @@ -192,10 +192,8 @@ impl Superblock { writing_children.remove(&ino); if let Ok(state) = inode.get_inode_state() { - metrics::counter!( - "metadata_cache.inode_forgotten_before_expiry", - state.stat.is_valid().into(), - ); + metrics::counter!("metadata_cache.inode_forgotten_before_expiry") + .increment(state.stat.is_valid().into()); }; } } @@ -674,7 +672,7 @@ impl SuperblockInner { Some(lookup) => trace!("lookup returned from cache: {:?}", lookup), None => trace!("no lookup available from cache"), } - metrics::counter!("metadata_cache.cache_hit", lookup.is_some().into()); + metrics::counter!("metadata_cache.cache_hit").increment(lookup.is_some().into()); lookup } @@ -1588,8 +1586,8 @@ impl InodeMap { } fn insert(&mut self, ino: InodeNo, inode: Inode) -> Option { - metrics::increment_gauge!("fs.inodes", 1.0); - metrics::increment_gauge!("fs.inode_kinds", 1.0, "kind" => inode.kind().as_str()); + metrics::gauge!("fs.inodes").increment(1.0); + metrics::gauge!("fs.inode_kinds", "kind" => inode.kind().as_str()).increment(1.0); self.map.insert(ino, inode).inspect(Self::remove_metrics) } @@ -1598,8 +1596,8 @@ impl InodeMap { } fn remove_metrics(inode: &Inode) { - metrics::decrement_gauge!("fs.inodes", 1.0); - metrics::decrement_gauge!("fs.inode_kinds", 1.0, "kind" => inode.kind().as_str()); + metrics::gauge!("fs.inodes").decrement(1.0); + metrics::gauge!("fs.inode_kinds", "kind" => inode.kind().as_str()).decrement(1.0); } } diff --git a/mountpoint-s3/src/inode/negative_cache.rs b/mountpoint-s3/src/inode/negative_cache.rs index 10a9d1e29..c8d41e29c 100644 --- a/mountpoint-s3/src/inode/negative_cache.rs +++ b/mountpoint-s3/src/inode/negative_cache.rs @@ -49,10 +49,10 @@ impl NegativeCache { .is_some_and(|expiry| !expiry.is_expired()); metrics::histogram!( "metadata_cache.negative_cache.operation_duration_us", - start.elapsed().as_micros() as f64, "op" => "contains", - ); - metrics::counter!("metadata_cache.negative_cache.cache_hit", contains_current.into()); + ) + .record(start.elapsed().as_micros() as f64); + metrics::counter!("metadata_cache.negative_cache.cache_hit").increment(contains_current.into()); contains_current } @@ -65,13 +65,13 @@ impl NegativeCache { let start = Instant::now(); let mut map = self.map.write().unwrap(); if map.remove(&key).is_some() { - metrics::gauge!("metadata_cache.negative_cache.entries", map.len() as f64); + metrics::gauge!("metadata_cache.negative_cache.entries").set(map.len() as f64); } metrics::histogram!( "metadata_cache.negative_cache.operation_duration_us", - start.elapsed().as_micros() as f64, "op" => "remove", - ); + ) + .record(start.elapsed().as_micros() as f64); } /// Insert an entry into the cache. If the entry already existed, @@ -98,19 +98,17 @@ impl NegativeCache { break; }; // Report how many entries are evicted while still current. - metrics::counter!( - "metadata_cache.negative_cache.entries_evicted_before_expiry", - (!e.is_expired()).into() - ); + metrics::counter!("metadata_cache.negative_cache.entries_evicted_before_expiry") + .increment((!e.is_expired()).into()); } - metrics::gauge!("metadata_cache.negative_cache.entries", map.len() as f64); + metrics::gauge!("metadata_cache.negative_cache.entries").set(map.len() as f64); } metrics::histogram!( "metadata_cache.negative_cache.operation_duration_us", - start.elapsed().as_micros() as f64, "op" => "insert", - ); + ) + .record(start.elapsed().as_micros() as f64); } } diff --git a/mountpoint-s3/src/metrics.rs b/mountpoint-s3/src/metrics.rs index 4c928968f..de5f89db4 100644 --- a/mountpoint-s3/src/metrics.rs +++ b/mountpoint-s3/src/metrics.rs @@ -7,7 +7,7 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; use dashmap::DashMap; -use metrics::{Key, Recorder}; +use metrics::{Key, Metadata, Recorder}; use crate::sync::mpsc::{channel, RecvTimeoutError, Sender}; use crate::sync::Arc; @@ -56,7 +56,7 @@ pub fn install() -> MetricsSinkHandle { }; let recorder = MetricsRecorder { sink }; - metrics::set_boxed_recorder(Box::new(recorder)).unwrap(); + metrics::set_global_recorder(recorder).unwrap(); handle } @@ -154,15 +154,15 @@ impl Recorder for MetricsRecorder { // No-op -- we don't implement descriptions } - fn register_counter(&self, key: &Key) -> metrics::Counter { + fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> metrics::Counter { self.sink.counter(key) } - fn register_gauge(&self, key: &Key) -> metrics::Gauge { + fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> metrics::Gauge { self.sink.gauge(key) } - fn register_histogram(&self, key: &Key) -> metrics::Histogram { + fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> metrics::Histogram { self.sink.histogram(key) } } @@ -185,40 +185,34 @@ impl Drop for MetricsSinkHandle { #[cfg(test)] mod tests { use super::*; - use metrics::Label; - use rusty_fork::rusty_fork_test; + use metrics::{with_local_recorder, Label}; const TEST_COUNTER: &str = "test_counter"; const TEST_GAUGE: &str = "test_gauge"; const TEST_HISTOGRAM: &str = "test_histogram"; - // Since `metric` crate operates on global recorders, - // we need to make sure we're evaluating against the metrics emitted - // by this test without interference from other tests. - rusty_fork_test! { - #[test] - fn basic_metrics() { - let sink = Arc::new(MetricsSink::new()); - let recorder = MetricsRecorder { sink: sink.clone() }; - metrics::set_boxed_recorder(Box::new(recorder)).unwrap(); - + #[test] + fn basic_metrics() { + let sink = Arc::new(MetricsSink::new()); + let recorder = MetricsRecorder { sink: sink.clone() }; + with_local_recorder(&recorder, || { // Run twice to check reset works for _ in 0..2 { - metrics::counter!(TEST_COUNTER, 1, "type" => "get"); - metrics::counter!(TEST_COUNTER, 1, "type" => "put"); - metrics::counter!(TEST_COUNTER, 2, "type" => "get"); - metrics::counter!(TEST_COUNTER, 2, "type" => "put"); - metrics::counter!(TEST_COUNTER, 3, "type" => "get"); - metrics::counter!(TEST_COUNTER, 4, "type" => "put"); - - metrics::gauge!(TEST_GAUGE, 5.0, "type" => "processing"); - metrics::gauge!(TEST_GAUGE, 5.0, "type" => "in_queue"); - metrics::gauge!(TEST_GAUGE, 2.0, "type" => "processing"); - metrics::gauge!(TEST_GAUGE, 3.0, "type" => "in_queue"); - - metrics::histogram!(TEST_HISTOGRAM, 3.0, "type" => "get"); - metrics::histogram!(TEST_HISTOGRAM, 4.0, "type" => "put"); - metrics::histogram!(TEST_HISTOGRAM, 4.0, "type" => "put"); + metrics::counter!(TEST_COUNTER, "type" => "get").increment(1); + metrics::counter!(TEST_COUNTER, "type" => "put").increment(1); + metrics::counter!(TEST_COUNTER, "type" => "get").increment(2); + metrics::counter!(TEST_COUNTER, "type" => "put").increment(2); + metrics::counter!(TEST_COUNTER, "type" => "get").increment(3); + metrics::counter!(TEST_COUNTER, "type" => "put").increment(4); + + metrics::gauge!(TEST_GAUGE, "type" => "processing").set(5.0); + metrics::gauge!(TEST_GAUGE, "type" => "in_queue").set(5.0); + metrics::gauge!(TEST_GAUGE, "type" => "processing").set(2.0); + metrics::gauge!(TEST_GAUGE, "type" => "in_queue").set(3.0); + + metrics::histogram!(TEST_HISTOGRAM, "type" => "get").record(3.0); + metrics::histogram!(TEST_HISTOGRAM, "type" => "put").record(4.0); + metrics::histogram!(TEST_HISTOGRAM, "type" => "put").record(4.0); for mut entry in sink.metrics.iter_mut() { let (key, metric) = entry.pair_mut(); @@ -277,8 +271,8 @@ mod tests { } // Set the gauges to zero and check they emit their change only once - metrics::gauge!(TEST_GAUGE, 0.0, "type" => "processing"); - metrics::gauge!(TEST_GAUGE, 0.0, "type" => "in_queue"); + metrics::gauge!(TEST_GAUGE, "type" => "processing").set(0.0); + metrics::gauge!(TEST_GAUGE, "type" => "in_queue").set(0.0); for mut entry in sink.metrics.iter_mut() { let metric = entry.value_mut(); let Metric::Gauge(inner) = metric else { @@ -288,6 +282,6 @@ mod tests { assert!(inner.load_if_changed().is_some()); assert!(inner.load_if_changed().is_none()); } - } + }); } } diff --git a/mountpoint-s3/src/metrics/tracing_span.rs b/mountpoint-s3/src/metrics/tracing_span.rs index d6f53901e..4d0b235f6 100644 --- a/mountpoint-s3/src/metrics/tracing_span.rs +++ b/mountpoint-s3/src/metrics/tracing_span.rs @@ -45,7 +45,7 @@ where if Self::should_instrument_request_time(ctx.span(&id)) { let data = ctx.span(&id).unwrap(); let RequestTime(start_time) = *data.extensions().get::().unwrap(); - histogram!("fuse.op_latency_us", start_time.elapsed().as_micros() as f64, "op" => data.name()); + histogram!("fuse.op_latency_us", "op" => data.name()).record(start_time.elapsed().as_micros() as f64); } } } diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 98808af75..70c4a97ea 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -274,7 +274,7 @@ where actual = offset, "out-of-order read, resetting prefetch" ); - counter!("prefetch.out_of_order", 1); + counter!("prefetch.out_of_order").increment(1); // This is an approximation, tolerating some seeking caused by concurrent readahead. self.record_contiguous_read_metric(); @@ -503,7 +503,7 @@ where self.backward_seek_window.push(part); } - histogram!("prefetch.seek_distance", total_seek_distance as f64, "dir" => "forward"); + histogram!("prefetch.seek_distance", "dir" => "forward").record(total_seek_distance as f64); Ok(true) } @@ -525,7 +525,7 @@ where self.current_task = Some(request); self.next_sequential_read_offset = offset; - histogram!("prefetch.seek_distance", backwards_length_needed as f64, "dir" => "backward"); + histogram!("prefetch.seek_distance", "dir" => "backward").record(backwards_length_needed as f64); Ok(true) } @@ -536,10 +536,8 @@ impl PrefetchGetObject) { @@ -269,7 +269,7 @@ where warn!(key=?self.cache_key.key(), block_index, ?error, "failed to update cache"); } }; - metrics::histogram!("prefetch.cache_update_duration_us", start.elapsed().as_micros() as f64); + metrics::histogram!("prefetch.cache_update_duration_us").record(start.elapsed().as_micros() as f64); } /// Creates a Part that can be streamed to the prefetcher from the given cache block. diff --git a/mountpoint-s3/src/prefetch/part_queue.rs b/mountpoint-s3/src/prefetch/part_queue.rs index 55970c446..f92f274c5 100644 --- a/mountpoint-s3/src/prefetch/part_queue.rs +++ b/mountpoint-s3/src/prefetch/part_queue.rs @@ -59,7 +59,7 @@ impl PartQueue { } else { let start = Instant::now(); let part = self.receiver.recv().await; - metrics::histogram!("prefetch.part_queue_starved_us", start.elapsed().as_micros() as f64); + metrics::histogram!("prefetch.part_queue_starved_us").record(start.elapsed().as_micros() as f64); match part { Err(RecvError) => Err(PrefetchReadError::GetRequestTerminatedUnexpectedly), Ok(part) => part,