Skip to content

Commit

Permalink
Improve cache metrics and logging (#619)
Browse files Browse the repository at this point in the history
Rework how metrics for the cache are collected:
* The disk data cache will collect metrics on block hit/miss/errors and the duration of reads, writes, and eviction.
* The prefetcher will track how many blocks are served from the cache vs requested to the client and also measure the total cache update time (write + eviction).

Also downgrades the level of logs by the cache from error to warning.

Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro authored Nov 21, 2023
1 parent b1c1781 commit b0e7358
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 20 deletions.
46 changes: 37 additions & 9 deletions mountpoint-s3/src/data_cache/disk_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
use std::fs;
use std::io::{ErrorKind, Read, Seek, Write};
use std::path::{Path, PathBuf};
use std::time::Instant;

use bytes::Bytes;
use linked_hash_map::LinkedHashMap;
use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use thiserror::Error;
use tracing::{error, trace, warn};
use tracing::{trace, warn};

use crate::checksums::IntegrityError;
use crate::data_cache::DataCacheError;
Expand Down Expand Up @@ -145,7 +146,7 @@ impl DiskBlockHeader {
Ok(Crc32c::new(data_checksum))
}
} else {
error!(
warn!(
s3_key_match,
etag_match, block_idx_match, "block data did not match expected values",
);
Expand Down Expand Up @@ -238,7 +239,7 @@ impl DiskDataCache {
let mut block_version = [0; CACHE_VERSION.len()];
file.read_exact(&mut block_version)?;
if block_version != CACHE_VERSION.as_bytes() {
error!(
warn!(
found_version = ?block_version, expected_version = ?CACHE_VERSION, path = ?path.as_ref(),
"stale block format found during reading"
);
Expand All @@ -248,7 +249,7 @@ impl DiskDataCache {
let block: DiskBlock = match bincode::deserialize_from(&file) {
Ok(block) => block,
Err(e) => {
error!("block could not be deserialized: {:?}", e);
warn!("block could not be deserialized: {:?}", e);
return Err(DataCacheError::InvalidBlockContent);
}
};
Expand Down Expand Up @@ -316,13 +317,13 @@ impl DiskDataCache {

while self.is_limit_exceeded(usage.lock().unwrap().size) {
let Some(to_remove) = usage.lock().unwrap().evict_lru() else {
error!("cache limit exceeded but nothing to evict");
warn!("cache limit exceeded but nothing to evict");
return Err(DataCacheError::EvictionFailure);
};
let path_to_remove = self.get_path_for_block_key(&to_remove);
trace!("evicting block at {}", path_to_remove.display());
if let Err(remove_err) = fs::remove_file(&path_to_remove) {
error!("unable to remove invalid block: {:?}", remove_err);
warn!("unable to remove invalid block: {:?}", remove_err);
}
}
Ok(())
Expand Down Expand Up @@ -350,24 +351,36 @@ impl DataCache for DiskDataCache {
if block_offset != block_idx * self.config.block_size {
return Err(DataCacheError::InvalidBlockOffset);
}
let start = Instant::now();
let block_key = DiskBlockKey::new(cache_key, block_idx);
let path = self.get_path_for_block_key(&block_key);
match self.read_block(&path, cache_key, block_idx, block_offset) {
Ok(None) => Ok(None),
Ok(None) => {
// Cache miss.
metrics::counter!("disk_data_cache.block_hit", 0);
Ok(None)
}
Ok(Some(bytes)) => {
// Cache hit.
metrics::counter!("disk_data_cache.block_hit", 1);
metrics::counter!("disk_data_cache.total_bytes", bytes.len() as u64, "type" => "read");
metrics::histogram!("disk_data_cache.read_duration_us", start.elapsed().as_micros() as f64);
if let Some(usage) = &self.usage {
usage.lock().unwrap().refresh(&block_key);
}
Ok(Some(bytes))
}
Err(err) => {
// Invalid block. Count as cache miss.
metrics::counter!("disk_data_cache.block_hit", 0);
metrics::counter!("disk_data_cache.block_err", 1);
match fs::remove_file(&path) {
Ok(()) => {
if let Some(usage) = &self.usage {
usage.lock().unwrap().remove(&block_key);
}
}
Err(remove_err) => error!("unable to remove invalid block: {:?}", remove_err),
Err(remove_err) => warn!("unable to remove invalid block: {:?}", remove_err),
}
Err(err)
}
Expand All @@ -385,6 +398,7 @@ impl DataCache for DiskDataCache {
return Err(DataCacheError::InvalidBlockOffset);
}

let bytes_len = bytes.len();
let block_key = DiskBlockKey::new(&cache_key, block_idx);
let path = self.get_path_for_block_key(&block_key);
trace!(?cache_key, ?path, "new block will be created in disk cache");
Expand All @@ -393,9 +407,23 @@ impl DataCache for DiskDataCache {
DiskBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent,
})?;

self.evict_if_needed()?;
{
let eviction_start = Instant::now();
let result = self.evict_if_needed();
metrics::histogram!(
"disk_data_cache.eviction_duration_us",
eviction_start.elapsed().as_micros() as f64
);
result
}?;

let write_start = Instant::now();
let size = self.write_block(path, block)?;
metrics::histogram!(
"disk_data_cache.write_duration_us",
write_start.elapsed().as_micros() as f64
);
metrics::counter!("disk_data_cache.total_bytes", bytes_len as u64, "type" => "write");
if let Some(usage) = &self.usage {
usage.lock().unwrap().add(block_key, size);
}
Expand Down
23 changes: 12 additions & 11 deletions mountpoint-s3/src/prefetch/caching_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use bytes::Bytes;
use futures::task::{Spawn, SpawnExt};
use futures::{pin_mut, StreamExt};
use mountpoint_s3_client::{types::ETag, ObjectClient};
use tracing::{debug_span, error, trace, warn, Instrument};
use tracing::{debug_span, trace, warn, Instrument};

use crate::checksums::ChecksummedBytes;
use crate::data_cache::{BlockIndex, CacheKey, DataCache};
Expand Down Expand Up @@ -124,13 +124,12 @@ where
Ok(Some(block)) => {
trace!(?cache_key, ?range, block_index, "cache hit");
let part = self.make_part(block, block_index, block_offset, &range);
metrics::counter!("cache.total_bytes", part.len() as u64, "type" => "read");
self.part_queue_producer.push(Ok(part));
block_offset += block_size;
continue;
}
Ok(None) => trace!(?cache_key, block_index, ?range, "cache miss - no data for block"),
Err(error) => error!(
Err(error) => warn!(
?cache_key,
block_index,
?range,
Expand All @@ -139,10 +138,14 @@ where
),
}
// If a block is uncached or reading it fails, fallback to S3 for the rest of the stream.
metrics::counter!("prefetch.blocks_served_from_cache", block_index - block_range.start);
metrics::counter!("prefetch.blocks_requested_to_client", block_range.end - block_index);
return self
.get_from_client(range.trim_start(block_offset), block_index..block_range.end)
.await;
}
// We served the whole range from cache.
metrics::counter!("prefetch.blocks_served_from_cache", block_range.end - block_range.start);
}

async fn get_from_client(&self, range: RequestRange, block_range: Range<u64>) {
Expand Down Expand Up @@ -172,7 +175,7 @@ where
{
Ok(get_object_result) => get_object_result,
Err(e) => {
error!(key, error=?e, "GetObject request failed");
warn!(key, error=?e, "GetObject request failed");
self.part_queue_producer
.push(Err(PrefetchReadError::GetRequestFailed(e)));
return;
Expand All @@ -194,7 +197,7 @@ where

let expected_offset = block_offset + buffer.len() as u64;
if offset != expected_offset {
error!(key, offset, expected_offset, "wrong offset for GetObject body part");
warn!(key, offset, expected_offset, "wrong offset for GetObject body part");
self.part_queue_producer
.push(Err(PrefetchReadError::GetRequestReturnedWrongOffset {
offset,
Expand All @@ -209,7 +212,7 @@ where
let remaining = (block_size as usize).saturating_sub(buffer.len()).min(body.len());
let chunk = body.split_to(remaining);
if let Err(e) = buffer.extend(chunk.into()) {
error!(key, error=?e, "integrity check for body part failed");
warn!(key, error=?e, "integrity check for body part failed");
self.part_queue_producer.push(Err(e.into()));
return;
}
Expand All @@ -227,7 +230,7 @@ where
}
}
Some(Err(e)) => {
error!(key, error=?e, "GetObject body part failed");
warn!(key, error=?e, "GetObject body part failed");
self.part_queue_producer
.push(Err(PrefetchReadError::GetRequestFailed(e)));
break;
Expand Down Expand Up @@ -260,14 +263,12 @@ where
.cache
.put_block(self.cache_key.clone(), block_index, block_offset, block.clone())
{
Ok(()) => {
metrics::histogram!("cache.write_duration_us", start.elapsed().as_micros() as f64);
metrics::counter!("cache.total_bytes", block.len() as u64, "type" => "write");
}
Ok(()) => {}
Err(error) => {
warn!(key=?self.cache_key.s3_key, block_index, ?error, "failed to update cache");
}
};
metrics::histogram!("prefetch.cache_update_duration_us", start.elapsed().as_micros() as f64);
}

/// Creates a Part that can be streamed to the prefetcher from the given cache block.
Expand Down

0 comments on commit b0e7358

Please sign in to comment.