From b0e7358a5bce1827258e930837ec96f5c77c33c3 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Tue, 21 Nov 2023 19:27:25 +0000 Subject: [PATCH] Improve cache metrics and logging (#619) Rework how metrics for the cache are collected: * The disk data cache will collect metrics on block hit/miss/errors and the duration of reads, writes, and eviction. * The prefetcher will track how many blocks are served from the cache vs requested to the client and also measure the total cache update time (write + eviction). Also downgrades the level of logs by the cache from error to warning. Signed-off-by: Alessandro Passaro --- .../src/data_cache/disk_data_cache.rs | 46 +++++++++++++++---- mountpoint-s3/src/prefetch/caching_stream.rs | 23 +++++----- 2 files changed, 49 insertions(+), 20 deletions(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 1263e884e..6a5877319 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -3,6 +3,7 @@ use std::fs; use std::io::{ErrorKind, Read, Seek, Write}; use std::path::{Path, PathBuf}; +use std::time::Instant; use bytes::Bytes; use linked_hash_map::LinkedHashMap; @@ -10,7 +11,7 @@ use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use thiserror::Error; -use tracing::{error, trace, warn}; +use tracing::{trace, warn}; use crate::checksums::IntegrityError; use crate::data_cache::DataCacheError; @@ -145,7 +146,7 @@ impl DiskBlockHeader { Ok(Crc32c::new(data_checksum)) } } else { - error!( + warn!( s3_key_match, etag_match, block_idx_match, "block data did not match expected values", ); @@ -238,7 +239,7 @@ impl DiskDataCache { let mut block_version = [0; CACHE_VERSION.len()]; file.read_exact(&mut block_version)?; if block_version != CACHE_VERSION.as_bytes() { - error!( + warn!( found_version = ?block_version, expected_version = ?CACHE_VERSION, path = ?path.as_ref(), "stale block format found during reading" ); @@ -248,7 +249,7 @@ impl DiskDataCache { let block: DiskBlock = match bincode::deserialize_from(&file) { Ok(block) => block, Err(e) => { - error!("block could not be deserialized: {:?}", e); + warn!("block could not be deserialized: {:?}", e); return Err(DataCacheError::InvalidBlockContent); } }; @@ -316,13 +317,13 @@ impl DiskDataCache { while self.is_limit_exceeded(usage.lock().unwrap().size) { let Some(to_remove) = usage.lock().unwrap().evict_lru() else { - error!("cache limit exceeded but nothing to evict"); + warn!("cache limit exceeded but nothing to evict"); return Err(DataCacheError::EvictionFailure); }; let path_to_remove = self.get_path_for_block_key(&to_remove); trace!("evicting block at {}", path_to_remove.display()); if let Err(remove_err) = fs::remove_file(&path_to_remove) { - error!("unable to remove invalid block: {:?}", remove_err); + warn!("unable to remove invalid block: {:?}", remove_err); } } Ok(()) @@ -350,24 +351,36 @@ impl DataCache for DiskDataCache { if block_offset != block_idx * self.config.block_size { return Err(DataCacheError::InvalidBlockOffset); } + let start = Instant::now(); let block_key = DiskBlockKey::new(cache_key, block_idx); let path = self.get_path_for_block_key(&block_key); match self.read_block(&path, cache_key, block_idx, block_offset) { - Ok(None) => Ok(None), + Ok(None) => { + // Cache miss. + metrics::counter!("disk_data_cache.block_hit", 0); + Ok(None) + } Ok(Some(bytes)) => { + // Cache hit. + metrics::counter!("disk_data_cache.block_hit", 1); + metrics::counter!("disk_data_cache.total_bytes", bytes.len() as u64, "type" => "read"); + metrics::histogram!("disk_data_cache.read_duration_us", start.elapsed().as_micros() as f64); if let Some(usage) = &self.usage { usage.lock().unwrap().refresh(&block_key); } Ok(Some(bytes)) } Err(err) => { + // Invalid block. Count as cache miss. + metrics::counter!("disk_data_cache.block_hit", 0); + metrics::counter!("disk_data_cache.block_err", 1); match fs::remove_file(&path) { Ok(()) => { if let Some(usage) = &self.usage { usage.lock().unwrap().remove(&block_key); } } - Err(remove_err) => error!("unable to remove invalid block: {:?}", remove_err), + Err(remove_err) => warn!("unable to remove invalid block: {:?}", remove_err), } Err(err) } @@ -385,6 +398,7 @@ impl DataCache for DiskDataCache { return Err(DataCacheError::InvalidBlockOffset); } + let bytes_len = bytes.len(); let block_key = DiskBlockKey::new(&cache_key, block_idx); let path = self.get_path_for_block_key(&block_key); trace!(?cache_key, ?path, "new block will be created in disk cache"); @@ -393,9 +407,23 @@ impl DataCache for DiskDataCache { DiskBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent, })?; - self.evict_if_needed()?; + { + let eviction_start = Instant::now(); + let result = self.evict_if_needed(); + metrics::histogram!( + "disk_data_cache.eviction_duration_us", + eviction_start.elapsed().as_micros() as f64 + ); + result + }?; + let write_start = Instant::now(); let size = self.write_block(path, block)?; + metrics::histogram!( + "disk_data_cache.write_duration_us", + write_start.elapsed().as_micros() as f64 + ); + metrics::counter!("disk_data_cache.total_bytes", bytes_len as u64, "type" => "write"); if let Some(usage) = &self.usage { usage.lock().unwrap().add(block_key, size); } diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs index 6b886108d..98f5ca180 100644 --- a/mountpoint-s3/src/prefetch/caching_stream.rs +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -5,7 +5,7 @@ use bytes::Bytes; use futures::task::{Spawn, SpawnExt}; use futures::{pin_mut, StreamExt}; use mountpoint_s3_client::{types::ETag, ObjectClient}; -use tracing::{debug_span, error, trace, warn, Instrument}; +use tracing::{debug_span, trace, warn, Instrument}; use crate::checksums::ChecksummedBytes; use crate::data_cache::{BlockIndex, CacheKey, DataCache}; @@ -124,13 +124,12 @@ where Ok(Some(block)) => { trace!(?cache_key, ?range, block_index, "cache hit"); let part = self.make_part(block, block_index, block_offset, &range); - metrics::counter!("cache.total_bytes", part.len() as u64, "type" => "read"); self.part_queue_producer.push(Ok(part)); block_offset += block_size; continue; } Ok(None) => trace!(?cache_key, block_index, ?range, "cache miss - no data for block"), - Err(error) => error!( + Err(error) => warn!( ?cache_key, block_index, ?range, @@ -139,10 +138,14 @@ where ), } // If a block is uncached or reading it fails, fallback to S3 for the rest of the stream. + metrics::counter!("prefetch.blocks_served_from_cache", block_index - block_range.start); + metrics::counter!("prefetch.blocks_requested_to_client", block_range.end - block_index); return self .get_from_client(range.trim_start(block_offset), block_index..block_range.end) .await; } + // We served the whole range from cache. + metrics::counter!("prefetch.blocks_served_from_cache", block_range.end - block_range.start); } async fn get_from_client(&self, range: RequestRange, block_range: Range) { @@ -172,7 +175,7 @@ where { Ok(get_object_result) => get_object_result, Err(e) => { - error!(key, error=?e, "GetObject request failed"); + warn!(key, error=?e, "GetObject request failed"); self.part_queue_producer .push(Err(PrefetchReadError::GetRequestFailed(e))); return; @@ -194,7 +197,7 @@ where let expected_offset = block_offset + buffer.len() as u64; if offset != expected_offset { - error!(key, offset, expected_offset, "wrong offset for GetObject body part"); + warn!(key, offset, expected_offset, "wrong offset for GetObject body part"); self.part_queue_producer .push(Err(PrefetchReadError::GetRequestReturnedWrongOffset { offset, @@ -209,7 +212,7 @@ where let remaining = (block_size as usize).saturating_sub(buffer.len()).min(body.len()); let chunk = body.split_to(remaining); if let Err(e) = buffer.extend(chunk.into()) { - error!(key, error=?e, "integrity check for body part failed"); + warn!(key, error=?e, "integrity check for body part failed"); self.part_queue_producer.push(Err(e.into())); return; } @@ -227,7 +230,7 @@ where } } Some(Err(e)) => { - error!(key, error=?e, "GetObject body part failed"); + warn!(key, error=?e, "GetObject body part failed"); self.part_queue_producer .push(Err(PrefetchReadError::GetRequestFailed(e))); break; @@ -260,14 +263,12 @@ where .cache .put_block(self.cache_key.clone(), block_index, block_offset, block.clone()) { - Ok(()) => { - metrics::histogram!("cache.write_duration_us", start.elapsed().as_micros() as f64); - metrics::counter!("cache.total_bytes", block.len() as u64, "type" => "write"); - } + Ok(()) => {} Err(error) => { warn!(key=?self.cache_key.s3_key, block_index, ?error, "failed to update cache"); } }; + metrics::histogram!("prefetch.cache_update_duration_us", start.elapsed().as_micros() as f64); } /// Creates a Part that can be streamed to the prefetcher from the given cache block.