Skip to content

Commit

Permalink
Update to new metrics crate version (#787)
Browse files Browse the repository at this point in the history
v0.22.0 of the metrics crate was a breaking change to how its macros
work -- they now return the counter/gauge/histogram itself and you call
methods on it to record metrics, rather than recording the metric as
part of the macro. So this change is mostly a find and replace to get
things compiling again with this new change.

Other than that change, there's two new things we'd like to use from
this new version:

1. Scoped local metrics recorders are now supported, which makes it much
   easier to write unit tests for metrics. The metrics recorders were
   previously global, so tests had to use `rusty_fork` to fork a new
   test process. I've used this change to update the current basic
   metrics tests.
2. Metrics now include metadata such as severity and module/line
   location. We're not using this yet, but could use it in the future to
   create scoped metrics or different metric severities for our logging
   use.

Signed-off-by: James Bornholt <[email protected]>
  • Loading branch information
jamesbornholt authored Feb 27, 2024
1 parent 5ce4863 commit 9326a48
Show file tree
Hide file tree
Showing 14 changed files with 109 additions and 161 deletions.
17 changes: 2 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mountpoint-s3-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const_format = "0.2.30"
futures = "0.3.24"
lazy_static = "1.4.0"
libc = "0.2.126"
metrics = "0.21.1"
metrics = "0.22.1"
once_cell = "1.16.0"
percent-encoding = "2.2.0"
pin-project = "1.0.12"
Expand Down
69 changes: 24 additions & 45 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,12 +473,12 @@ impl S3CrtClientInner {

let op = span_telemetry.metadata().map(|m| m.name()).unwrap_or("unknown");
if let Some(ttfb) = ttfb {
metrics::histogram!("s3.requests.first_byte_latency_us", ttfb.as_micros() as f64, "op" => op, "type" => request_type);
metrics::histogram!("s3.requests.first_byte_latency_us", "op" => op, "type" => request_type).record(ttfb.as_micros() as f64);
}
metrics::histogram!("s3.requests.total_latency_us", duration.as_micros() as f64, "op" => op, "type" => request_type);
metrics::counter!("s3.requests", 1, "op" => op, "type" => request_type);
metrics::histogram!("s3.requests.total_latency_us", "op" => op, "type" => request_type).record(duration.as_micros() as f64);
metrics::counter!("s3.requests", "op" => op, "type" => request_type).increment(1);
if request_failure {
metrics::counter!("s3.requests.failures", 1, "op" => op, "type" => request_type, "status" => http_status.unwrap_or(-1).to_string());
metrics::counter!("s3.requests.failures", "op" => op, "type" => request_type, "status" => http_status.unwrap_or(-1).to_string()).increment(1);
}
})
.on_headers(move |headers, response_status| {
Expand All @@ -490,7 +490,7 @@ impl S3CrtClientInner {
if first_body_part.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
let latency = start_time.elapsed().as_micros() as f64;
let op = span_body.metadata().map(|m| m.name()).unwrap_or("unknown");
metrics::histogram!("s3.meta_requests.first_byte_latency_us", latency, "op" => op);
metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
}
total_bytes.fetch_add(data.len() as u64, Ordering::SeqCst);

Expand All @@ -504,12 +504,12 @@ impl S3CrtClientInner {
let op = span_finish.metadata().map(|m| m.name()).unwrap_or("unknown");
let duration = start_time.elapsed();

metrics::counter!("s3.meta_requests", 1, "op" => op);
metrics::histogram!("s3.meta_requests.total_latency_us", duration.as_micros() as f64, "op" => op);
metrics::counter!("s3.meta_requests", "op" => op).increment(1);
metrics::histogram!("s3.meta_requests.total_latency_us", "op" => op).record(duration.as_micros() as f64);
// Some HTTP requests (like HEAD) don't have a body to stream back, so calculate TTFB now
if first_body_part_clone.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).ok() == Some(true) {
let latency = duration.as_micros() as f64;
metrics::histogram!("s3.meta_requests.first_byte_latency_us", latency, "op" => op);
metrics::histogram!("s3.meta_requests.first_byte_latency_us", "op" => op).record(latency);
}
let total_bytes = total_bytes_clone.load(Ordering::SeqCst);
// We only log throughput of object data. PUT needs to be measured in its stream
Expand All @@ -519,7 +519,7 @@ impl S3CrtClientInner {
}
let hostname_awsstring = AwsString::from_str(&hostname, &Allocator::default());
if let Ok(host_count) = host_resolver.get_host_address_count(&hostname_awsstring, AddressKinds::a()) {
metrics::absolute_counter!("s3.client.host_count", host_count as u64, "host" => hostname);
metrics::gauge!("s3.client.host_count", "host" => hostname).set(host_count as f64);
}

let log_level = status_code_to_log_level(request_result.response_status);
Expand Down Expand Up @@ -548,7 +548,7 @@ impl S3CrtClientInner {
} else {
-request_result.crt_error.raw_error()
};
metrics::counter!("s3.meta_requests.failures", 1, "op" => op, "status" => format!("{error_status}"));
metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1);

// Fill in a generic error if we weren't able to parse one
Err(maybe_err.unwrap_or_else(|| ObjectClientError::ClientError(S3RequestError::ResponseError(request_result))))
Expand Down Expand Up @@ -618,40 +618,19 @@ impl S3CrtClientInner {

fn poll_client_metrics(s3_client: &Client) {
let metrics = s3_client.poll_client_metrics();
metrics::gauge!(
"s3.client.num_requests_being_processed",
metrics.num_requests_tracked_requests as f64
);
metrics::gauge!(
"s3.client.num_requests_being_prepared",
metrics.num_requests_being_prepared as f64
);
metrics::gauge!("s3.client.request_queue_size", metrics.request_queue_size as f64);
metrics::gauge!(
"s3.client.num_auto_default_network_io",
metrics.num_auto_default_network_io as f64
);
metrics::gauge!(
"s3.client.num_auto_ranged_get_network_io",
metrics.num_auto_ranged_get_network_io as f64
);
metrics::gauge!(
"s3.client.num_auto_ranged_put_network_io",
metrics.num_auto_ranged_put_network_io as f64
);
metrics::gauge!(
"s3.client.num_auto_ranged_copy_network_io",
metrics.num_auto_ranged_copy_network_io as f64
);
metrics::gauge!("s3.client.num_total_network_io", metrics.num_total_network_io() as f64);
metrics::gauge!(
"s3.client.num_requests_stream_queued_waiting",
metrics.num_requests_stream_queued_waiting as f64
);
metrics::gauge!(
"s3.client.num_requests_streaming_response",
metrics.num_requests_streaming_response as f64
);
metrics::gauge!("s3.client.num_requests_being_processed").set(metrics.num_requests_tracked_requests as f64);
metrics::gauge!("s3.client.num_requests_being_prepared").set(metrics.num_requests_being_prepared as f64);
metrics::gauge!("s3.client.request_queue_size").set(metrics.request_queue_size as f64);
metrics::gauge!("s3.client.num_auto_default_network_io").set(metrics.num_auto_default_network_io as f64);
metrics::gauge!("s3.client.num_auto_ranged_get_network_io").set(metrics.num_auto_ranged_get_network_io as f64);
metrics::gauge!("s3.client.num_auto_ranged_put_network_io").set(metrics.num_auto_ranged_put_network_io as f64);
metrics::gauge!("s3.client.num_auto_ranged_copy_network_io")
.set(metrics.num_auto_ranged_copy_network_io as f64);
metrics::gauge!("s3.client.num_total_network_io").set(metrics.num_total_network_io() as f64);
metrics::gauge!("s3.client.num_requests_stream_queued_waiting")
.set(metrics.num_requests_stream_queued_waiting as f64);
metrics::gauge!("s3.client.num_requests_streaming_response")
.set(metrics.num_requests_streaming_response as f64);
}

fn next_request_counter(&self) -> u64 {
Expand Down Expand Up @@ -959,7 +938,7 @@ fn emit_throughput_metric(bytes: u64, duration: Duration, op: &'static str) {
} else {
">16MiB"
};
metrics::histogram!("s3.meta_requests.throughput_mibs", throughput_mbps, "op" => op, "size" => bucket);
metrics::histogram!("s3.meta_requests.throughput_mibs", "op" => op, "size" => bucket).record(throughput_mbps);
}

#[cfg_attr(not(docs_rs), async_trait)]
Expand Down
3 changes: 1 addition & 2 deletions mountpoint-s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ hex = "0.4.3"
lazy_static = "1.4.0"
libc = "0.2.126"
linked-hash-map = "0.5.6"
metrics = "0.21.1"
metrics = "0.22.1"
nix = { version = "0.27.1", features = ["user"] }
regex = "1.7.1"
serde = { version = "1.0.190", features = ["derive"] }
Expand Down Expand Up @@ -62,7 +62,6 @@ proptest = "1.4.0"
proptest-derive = "0.4.0"
rand = "0.8.5"
rand_chacha = "0.3.1"
rusty-fork = "0.3.0"
serial_test = "2.0.0"
sha2 = "0.10.6"
shuttle = { version = "0.5.0" }
Expand Down
25 changes: 10 additions & 15 deletions mountpoint-s3/src/data_cache/disk_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,23 +366,23 @@ impl DataCache for DiskDataCache {
match self.read_block(&path, cache_key, block_idx, block_offset) {
Ok(None) => {
// Cache miss.
metrics::counter!("disk_data_cache.block_hit", 0);
metrics::counter!("disk_data_cache.block_hit").increment(0);
Ok(None)
}
Ok(Some(bytes)) => {
// Cache hit.
metrics::counter!("disk_data_cache.block_hit", 1);
metrics::counter!("disk_data_cache.total_bytes", bytes.len() as u64, "type" => "read");
metrics::histogram!("disk_data_cache.read_duration_us", start.elapsed().as_micros() as f64);
metrics::counter!("disk_data_cache.block_hit").increment(1);
metrics::counter!("disk_data_cache.total_bytes", "type" => "read").increment(bytes.len() as u64);
metrics::histogram!("disk_data_cache.read_duration_us").record(start.elapsed().as_micros() as f64);
if let Some(usage) = &self.usage {
usage.lock().unwrap().refresh(&block_key);
}
Ok(Some(bytes))
}
Err(err) => {
// Invalid block. Count as cache miss.
metrics::counter!("disk_data_cache.block_hit", 0);
metrics::counter!("disk_data_cache.block_err", 1);
metrics::counter!("disk_data_cache.block_hit").increment(0);
metrics::counter!("disk_data_cache.block_err").increment(1);
match fs::remove_file(&path) {
Ok(()) => {
if let Some(usage) = &self.usage {
Expand Down Expand Up @@ -419,20 +419,15 @@ impl DataCache for DiskDataCache {
{
let eviction_start = Instant::now();
let result = self.evict_if_needed();
metrics::histogram!(
"disk_data_cache.eviction_duration_us",
eviction_start.elapsed().as_micros() as f64
);
metrics::histogram!("disk_data_cache.eviction_duration_us")
.record(eviction_start.elapsed().as_micros() as f64);
result
}?;

let write_start = Instant::now();
let size = self.write_block(path, block)?;
metrics::histogram!(
"disk_data_cache.write_duration_us",
write_start.elapsed().as_micros() as f64
);
metrics::counter!("disk_data_cache.total_bytes", bytes_len as u64, "type" => "write");
metrics::histogram!("disk_data_cache.write_duration_us").record(write_start.elapsed().as_micros() as f64);
metrics::counter!("disk_data_cache.total_bytes", "type" => "write").increment(bytes_len as u64);
if let Some(usage) = &self.usage {
usage.lock().unwrap().add(block_key, size);
}
Expand Down
8 changes: 4 additions & 4 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ where
}
Ok(request) => FileHandleState::Write(UploadState::InProgress { request, handle }),
};
metrics::increment_gauge!("fs.current_handles", 1.0, "type" => "write");
metrics::gauge!("fs.current_handles", "type" => "write").increment(1.0);
Ok(handle)
}

Expand All @@ -142,7 +142,7 @@ where
.prefetcher
.prefetch(fs.client.clone(), &fs.bucket, &full_key, object_size, etag.clone());
let handle = FileHandleState::Read(request);
metrics::increment_gauge!("fs.current_handles", 1.0, "type" => "read");
metrics::gauge!("fs.current_handles", "type" => "read").increment(1.0);
Ok(handle)
}
}
Expand Down Expand Up @@ -1136,15 +1136,15 @@ where
let request = match file_handle.state.into_inner() {
FileHandleState::Read { .. } => {
// TODO make sure we cancel the inflight PrefetchingGetRequest. is just dropping enough?
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "read");
metrics::gauge!("fs.current_handles", "type" => "read").decrement(1.0);
file_handle.inode.finish_reading()?;
return Ok(());
}
FileHandleState::Write(request) => request,
};

let result = request.complete_if_in_progress(&file_handle.full_key).await;
metrics::decrement_gauge!("fs.current_handles", 1.0, "type" => "write");
metrics::gauge!("fs.current_handles", "type" => "write").decrement(1.0);
// Errors won't actually be seen by the user because `release` is async,
// but it's the right thing to do.
result
Expand Down
18 changes: 9 additions & 9 deletions mountpoint-s3/src/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ macro_rules! fuse_error {
($name:literal, $reply:expr, $err:expr) => {{
let err = $err;
event!(err.level, "{} failed: {:#}", $name, err);
::metrics::counter!("fuse.op_failures", 1, "op" => $name);
::metrics::counter!("fuse.op_failures", "op" => $name).increment(1);
$reply.error(err.to_errno());
}};
}
Expand All @@ -49,8 +49,8 @@ macro_rules! fuse_error {
macro_rules! fuse_unsupported {
($name:literal, $reply:expr, $err:expr, $level:expr) => {{
event!($level, "{} failed: operation not supported by Mountpoint", $name);
::metrics::counter!("fuse.op_failures", 1, "op" => $name);
::metrics::counter!("fuse.op_unimplemented", 1, "op" => $name);
::metrics::counter!("fuse.op_failures", "op" => $name).increment(1);
::metrics::counter!("fuse.op_unimplemented","op" => $name).increment(1);
$reply.error($err);
}};
($name:literal, $reply:expr) => {
Expand Down Expand Up @@ -150,8 +150,8 @@ where
Err(err) => fuse_error!("read", reply, err),
}

metrics::counter!("fuse.total_bytes", bytes_sent as u64, "type" => "read");
metrics::histogram!("fuse.io_size", bytes_sent as f64, "type" => "read");
metrics::counter!("fuse.total_bytes", "type" => "read").increment(bytes_sent as u64);
metrics::histogram!("fuse.io_size", "type" => "read").record(bytes_sent as f64);
}

#[instrument(level="warn", skip_all, fields(req=_req.unique(), ino=parent, name=field::Empty))]
Expand Down Expand Up @@ -188,7 +188,7 @@ where
match block_on(self.fs.readdir(parent, fh, offset, replier).in_current_span()) {
Ok(_) => {
reply.ok();
metrics::counter!("fuse.readdir.entries", count as u64);
metrics::counter!("fuse.readdir.entries").increment(count as u64);
}
Err(e) => fuse_error!("readdir", reply, e),
}
Expand Down Expand Up @@ -234,7 +234,7 @@ where
match block_on(self.fs.readdirplus(parent, fh, offset, replier).in_current_span()) {
Ok(_) => {
reply.ok();
metrics::counter!("fuse.readdirplus.entries", count as u64);
metrics::counter!("fuse.readdirplus.entries").increment(count as u64);
}
Err(e) => fuse_error!("readdirplus", reply, e),
}
Expand Down Expand Up @@ -332,8 +332,8 @@ where
) {
Ok(bytes_written) => {
reply.written(bytes_written);
metrics::counter!("fuse.total_bytes", bytes_written as u64, "type" => "write");
metrics::histogram!("fuse.io_size", bytes_written as f64, "type" => "write");
metrics::counter!("fuse.total_bytes", "type" => "write").increment(bytes_written as u64);
metrics::histogram!("fuse.io_size", "type" => "write").record(bytes_written as f64);
}
Err(e) => fuse_error!("write", reply, e),
}
Expand Down
16 changes: 7 additions & 9 deletions mountpoint-s3/src/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,8 @@ impl Superblock {
writing_children.remove(&ino);

if let Ok(state) = inode.get_inode_state() {
metrics::counter!(
"metadata_cache.inode_forgotten_before_expiry",
state.stat.is_valid().into(),
);
metrics::counter!("metadata_cache.inode_forgotten_before_expiry")
.increment(state.stat.is_valid().into());
};
}
}
Expand Down Expand Up @@ -674,7 +672,7 @@ impl SuperblockInner {
Some(lookup) => trace!("lookup returned from cache: {:?}", lookup),
None => trace!("no lookup available from cache"),
}
metrics::counter!("metadata_cache.cache_hit", lookup.is_some().into());
metrics::counter!("metadata_cache.cache_hit").increment(lookup.is_some().into());

lookup
}
Expand Down Expand Up @@ -1588,8 +1586,8 @@ impl InodeMap {
}

fn insert(&mut self, ino: InodeNo, inode: Inode) -> Option<Inode> {
metrics::increment_gauge!("fs.inodes", 1.0);
metrics::increment_gauge!("fs.inode_kinds", 1.0, "kind" => inode.kind().as_str());
metrics::gauge!("fs.inodes").increment(1.0);
metrics::gauge!("fs.inode_kinds", "kind" => inode.kind().as_str()).increment(1.0);
self.map.insert(ino, inode).inspect(Self::remove_metrics)
}

Expand All @@ -1598,8 +1596,8 @@ impl InodeMap {
}

fn remove_metrics(inode: &Inode) {
metrics::decrement_gauge!("fs.inodes", 1.0);
metrics::decrement_gauge!("fs.inode_kinds", 1.0, "kind" => inode.kind().as_str());
metrics::gauge!("fs.inodes").decrement(1.0);
metrics::gauge!("fs.inode_kinds", "kind" => inode.kind().as_str()).decrement(1.0);
}
}

Expand Down
Loading

0 comments on commit 9326a48

Please sign in to comment.