From 9206ed4847bbf2574dc7650483e2126b89a14d10 Mon Sep 17 00:00:00 2001 From: Volodkin Vladislav Date: Wed, 13 Nov 2024 13:56:48 +0000 Subject: [PATCH] Bypass the shared cache for large objects (#1117) ## Description of change This change makes `get_block` and `put_block` for objects larger than `1MiB` be a no-op in the shared cache. Relevant issues: N/A ## Does this change impact existing behavior? No, it is under the feature flag. ## Does this change need a changelog entry in any of the crates? Yes, in the following PRs. --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/). --------- Signed-off-by: Vlad Volodkin Co-authored-by: Vlad Volodkin --- mountpoint-s3/src/cli.rs | 39 +++---- mountpoint-s3/src/data_cache.rs | 4 +- .../src/data_cache/disk_data_cache.rs | 38 ++++--- .../src/data_cache/express_data_cache.rs | 106 ++++++++++++++---- .../src/data_cache/in_memory_data_cache.rs | 24 ++-- .../src/data_cache/multilevel_cache.rs | 100 +++++++++++++---- mountpoint-s3/src/prefetch/caching_stream.rs | 32 ++++-- 7 files changed, 252 insertions(+), 91 deletions(-) diff --git a/mountpoint-s3/src/cli.rs b/mountpoint-s3/src/cli.rs index ccfc887e8..f0ead18c0 100644 --- a/mountpoint-s3/src/cli.rs +++ b/mountpoint-s3/src/cli.rs @@ -28,7 +28,8 @@ use regex::Regex; use sysinfo::{RefreshKind, System}; use crate::data_cache::{ - CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ManagedCacheDir, MultilevelDataCache, + CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ExpressDataCacheConfig, ManagedCacheDir, + MultilevelDataCache, }; use crate::fs::{CacheConfig, ServerSideEncryption, TimeToLive}; use crate::fuse::session::FuseSession; @@ -445,7 +446,17 @@ impl CliArgs { None } - fn disk_data_cache_config(&self) -> Option<(&Path, DiskDataCacheConfig)> { + fn express_data_cache_config(&self) -> Option<(ExpressDataCacheConfig, &str, &str)> { + let express_bucket_name = self.cache_express_bucket_name()?; + let config = ExpressDataCacheConfig { + block_size: self.cache_block_size_in_bytes(), + ..Default::default() + }; + + Some((config, &self.bucket_name, express_bucket_name)) + } + + fn disk_data_cache_config(&self) -> Option<(DiskDataCacheConfig, &Path)> { match self.cache.as_ref() { Some(path) => { let cache_limit = match self.max_cache_size { @@ -460,7 +471,7 @@ impl CliArgs { block_size: self.cache_block_size_in_bytes(), limit: cache_limit, }; - Some((path.as_path(), cache_config)) + Some((cache_config, path.as_path())) } None => None, } @@ -880,15 +891,10 @@ where tracing::trace!("using metadata TTL setting {metadata_cache_ttl:?}"); filesystem_config.cache_config = CacheConfig::new(metadata_cache_ttl); - match (args.disk_data_cache_config(), args.cache_express_bucket_name()) { - (None, Some(express_bucket_name)) => { + match (args.disk_data_cache_config(), args.express_data_cache_config()) { + (None, Some((config, bucket_name, cache_bucket_name))) => { tracing::trace!("using S3 Express One Zone bucket as a cache for object content"); - let express_cache = ExpressDataCache::new( - express_bucket_name, - client.clone(), - &args.bucket_name, - args.cache_block_size_in_bytes(), - ); + let express_cache = ExpressDataCache::new(client.clone(), config, bucket_name, cache_bucket_name); let prefetcher = caching_prefetch(express_cache, runtime, prefetcher_config); let fuse_session = create_filesystem( @@ -903,7 +909,7 @@ where Ok(fuse_session) } - (Some((cache_dir_path, disk_data_cache_config)), None) => { + (Some((disk_data_cache_config, cache_dir_path)), None) => { tracing::trace!("using local disk as a cache for object content"); let (managed_cache_dir, disk_cache) = create_disk_cache(cache_dir_path, disk_data_cache_config)?; @@ -924,15 +930,10 @@ where Ok(fuse_session) } - (Some((cache_dir_path, disk_data_cache_config)), Some(express_bucket_name)) => { + (Some((disk_data_cache_config, cache_dir_path)), Some((config, bucket_name, cache_bucket_name))) => { tracing::trace!("using both local disk and S3 Express One Zone bucket as a cache for object content"); let (managed_cache_dir, disk_cache) = create_disk_cache(cache_dir_path, disk_data_cache_config)?; - let express_cache = ExpressDataCache::new( - express_bucket_name, - client.clone(), - &args.bucket_name, - args.cache_block_size_in_bytes(), - ); + let express_cache = ExpressDataCache::new(client.clone(), config, bucket_name, cache_bucket_name); let cache = MultilevelDataCache::new(Arc::new(disk_cache), express_cache, runtime.clone()); let prefetcher = caching_prefetch(cache, runtime, prefetcher_config); diff --git a/mountpoint-s3/src/data_cache.rs b/mountpoint-s3/src/data_cache.rs index d9a6f6585..d7b6ca5d5 100644 --- a/mountpoint-s3/src/data_cache.rs +++ b/mountpoint-s3/src/data_cache.rs @@ -16,7 +16,7 @@ use thiserror::Error; pub use crate::checksums::ChecksummedBytes; pub use crate::data_cache::cache_directory::ManagedCacheDir; pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig}; -pub use crate::data_cache::express_data_cache::ExpressDataCache; +pub use crate::data_cache::express_data_cache::{ExpressDataCache, ExpressDataCacheConfig}; pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache; pub use crate::data_cache::multilevel_cache::MultilevelDataCache; @@ -54,6 +54,7 @@ pub trait DataCache { cache_key: &ObjectId, block_idx: BlockIndex, block_offset: u64, + object_size: usize, ) -> DataCacheResult>; /// Put block of data to the cache for the given [ObjectId] and [BlockIndex]. @@ -63,6 +64,7 @@ pub trait DataCache { block_idx: BlockIndex, block_offset: u64, bytes: ChecksummedBytes, + object_size: usize, ) -> DataCacheResult<()>; /// Returns the block size for the data cache. diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index ab41ad52b..9b33d1572 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -369,6 +369,7 @@ impl DataCache for DiskDataCache { cache_key: &ObjectId, block_idx: BlockIndex, block_offset: u64, + _object_size: usize, ) -> DataCacheResult> { if block_offset != block_idx * self.config.block_size { return Err(DataCacheError::InvalidBlockOffset); @@ -418,6 +419,7 @@ impl DataCache for DiskDataCache { block_idx: BlockIndex, block_offset: u64, bytes: ChecksummedBytes, + _object_size: usize, ) -> DataCacheResult<()> { if block_offset != block_idx * self.config.block_size { return Err(DataCacheError::InvalidBlockOffset); @@ -645,6 +647,9 @@ mod tests { let data_2 = ChecksummedBytes::new("Bar".into()); let data_3 = ChecksummedBytes::new("Baz".into()); + let object_1_size = data_1.len() + data_3.len(); + let object_2_size = data_2.len(); + let block_size = 8 * 1024 * 1024; let cache_directory = tempfile::tempdir().unwrap(); let cache = DiskDataCache::new( @@ -661,7 +666,7 @@ mod tests { ); let block = cache - .get_block(&cache_key_1, 0, 0) + .get_block(&cache_key_1, 0, 0, object_1_size) .await .expect("cache should be accessible"); assert!( @@ -672,11 +677,11 @@ mod tests { // PUT and GET, OK? cache - .put_block(cache_key_1.clone(), 0, 0, data_1.clone()) + .put_block(cache_key_1.clone(), 0, 0, data_1.clone(), object_1_size) .await .expect("cache should be accessible"); let entry = cache - .get_block(&cache_key_1, 0, 0) + .get_block(&cache_key_1, 0, 0, object_1_size) .await .expect("cache should be accessible") .expect("cache entry should be returned"); @@ -687,11 +692,11 @@ mod tests { // PUT AND GET a second file, OK? cache - .put_block(cache_key_2.clone(), 0, 0, data_2.clone()) + .put_block(cache_key_2.clone(), 0, 0, data_2.clone(), object_2_size) .await .expect("cache should be accessible"); let entry = cache - .get_block(&cache_key_2, 0, 0) + .get_block(&cache_key_2, 0, 0, object_2_size) .await .expect("cache should be accessible") .expect("cache entry should be returned"); @@ -702,11 +707,11 @@ mod tests { // PUT AND GET a second block in a cache entry, OK? cache - .put_block(cache_key_1.clone(), 1, block_size, data_3.clone()) + .put_block(cache_key_1.clone(), 1, block_size, data_3.clone(), object_1_size) .await .expect("cache should be accessible"); let entry = cache - .get_block(&cache_key_1, 1, block_size) + .get_block(&cache_key_1, 1, block_size, object_1_size) .await .expect("cache should be accessible") .expect("cache entry should be returned"); @@ -717,7 +722,7 @@ mod tests { // Entry 1's first block still intact let entry = cache - .get_block(&cache_key_1, 0, 0) + .get_block(&cache_key_1, 0, 0, object_1_size) .await .expect("cache should be accessible") .expect("cache entry should be returned"); @@ -743,11 +748,11 @@ mod tests { let cache_key = ObjectId::new("a".into(), ETag::for_tests()); cache - .put_block(cache_key.clone(), 0, 0, slice.clone()) + .put_block(cache_key.clone(), 0, 0, slice.clone(), slice.len()) .await .expect("cache should be accessible"); let entry = cache - .get_block(&cache_key, 0, 0) + .get_block(&cache_key, 0, 0, slice.len()) .await .expect("cache should be accessible") .expect("cache entry should be returned"); @@ -778,9 +783,10 @@ mod tests { cache_key: &ObjectId, block_idx: u64, expected_bytes: &ChecksummedBytes, + object_size: usize, ) -> bool { if let Some(retrieved) = cache - .get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64) + .get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64, object_size) .await .expect("cache should be accessible") { @@ -828,6 +834,7 @@ mod tests { block_idx as u64, (block_idx * BLOCK_SIZE) as u64, bytes.clone(), + LARGE_OBJECT_SIZE, ) .await .unwrap(); @@ -841,13 +848,16 @@ mod tests { block_idx as u64, (block_idx * BLOCK_SIZE) as u64, bytes.clone(), + SMALL_OBJECT_SIZE, ) .await .unwrap(); } let count_small_object_blocks_in_cache = futures::stream::iter(small_object_blocks.iter().enumerate()) - .filter(|&(block_idx, bytes)| is_block_in_cache(&cache, &small_object_key, block_idx as u64, bytes)) + .filter(|&(block_idx, bytes)| { + is_block_in_cache(&cache, &small_object_key, block_idx as u64, bytes, SMALL_OBJECT_SIZE) + }) .count() .await; assert_eq!( @@ -857,7 +867,9 @@ mod tests { ); let count_large_object_blocks_in_cache = futures::stream::iter(large_object_blocks.iter().enumerate()) - .filter(|&(block_idx, bytes)| is_block_in_cache(&cache, &large_object_key, block_idx as u64, bytes)) + .filter(|&(block_idx, bytes)| { + is_block_in_cache(&cache, &large_object_key, block_idx as u64, bytes, LARGE_OBJECT_SIZE) + }) .count() .await; assert!( diff --git a/mountpoint-s3/src/data_cache/express_data_cache.rs b/mountpoint-s3/src/data_cache/express_data_cache.rs index 296121f85..a9fac357a 100644 --- a/mountpoint-s3/src/data_cache/express_data_cache.rs +++ b/mountpoint-s3/src/data_cache/express_data_cache.rs @@ -13,12 +13,31 @@ use tracing::Instrument; const CACHE_VERSION: &str = "V1"; +/// Configuration for a [ExpressDataCache]. +#[derive(Debug)] +pub struct ExpressDataCacheConfig { + /// Size of data blocks. + pub block_size: u64, + /// The maximum size of an object to be cached. + pub max_object_size: usize, +} + +impl Default for ExpressDataCacheConfig { + fn default() -> Self { + Self { + block_size: 1024 * 1024, // 1 MiB + max_object_size: 1024 * 1024, // 1 MiB + } + } +} + /// A data cache on S3 Express One Zone that can be shared across Mountpoint instances. pub struct ExpressDataCache { client: Client, - bucket_name: String, prefix: String, - block_size: u64, + config: ExpressDataCacheConfig, + /// Name of the S3 Express bucket to store the blocks. + bucket_name: String, } impl From> for DataCacheError @@ -38,19 +57,19 @@ where /// Create a new instance. /// /// TODO: consider adding some validation of the bucket. - pub fn new(bucket_name: &str, client: Client, source_description: &str, block_size: u64) -> Self { + pub fn new(client: Client, config: ExpressDataCacheConfig, source_bucket_name: &str, bucket_name: &str) -> Self { let prefix = hex::encode( Sha256::new() .chain_update(CACHE_VERSION.as_bytes()) - .chain_update(block_size.to_be_bytes()) - .chain_update(source_description.as_bytes()) + .chain_update(config.block_size.to_be_bytes()) + .chain_update(source_bucket_name.as_bytes()) .finalize(), ); Self { client, - bucket_name: bucket_name.to_owned(), prefix, - block_size, + config, + bucket_name: bucket_name.to_owned(), } } } @@ -65,8 +84,13 @@ where cache_key: &ObjectId, block_idx: BlockIndex, block_offset: u64, + object_size: usize, ) -> DataCacheResult> { - if block_offset != block_idx * self.block_size { + if object_size > self.config.max_object_size { + return Ok(None); + } + + if block_offset != block_idx * self.config.block_size { return Err(DataCacheError::InvalidBlockOffset); } @@ -83,7 +107,7 @@ where pin_mut!(result); // Guarantee that the request will start even in case of `initial_read_window == 0`. - result.as_mut().increment_read_window(self.block_size as usize); + result.as_mut().increment_read_window(self.config.block_size as usize); // TODO: optimize for the common case of a single chunk. let mut buffer = BytesMut::default(); @@ -96,7 +120,7 @@ where buffer.extend_from_slice(&body); // Ensure the flow-control window is large enough. - result.as_mut().increment_read_window(self.block_size as usize); + result.as_mut().increment_read_window(self.config.block_size as usize); } Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None), Err(e) => return Err(e.into()), @@ -112,8 +136,13 @@ where block_idx: BlockIndex, block_offset: u64, bytes: ChecksummedBytes, + object_size: usize, ) -> DataCacheResult<()> { - if block_offset != block_idx * self.block_size { + if object_size > self.config.max_object_size { + return Ok(()); + } + + if block_offset != block_idx * self.config.block_size { return Err(DataCacheError::InvalidBlockOffset); } @@ -134,7 +163,7 @@ where } fn block_size(&self) -> u64 { - self.block_size + self.config.block_size } } @@ -173,12 +202,19 @@ mod tests { }; let client = Arc::new(MockClient::new(config)); - let cache = ExpressDataCache::new(bucket, client, "unique source description", block_size); + let config = ExpressDataCacheConfig { + block_size, + ..Default::default() + }; + let cache = ExpressDataCache::new(client, config, "unique source description", bucket); let data_1 = ChecksummedBytes::new("Foo".into()); let data_2 = ChecksummedBytes::new("Bar".into()); let data_3 = ChecksummedBytes::new("a".repeat(block_size as usize).into()); + let object_1_size = data_1.len() + data_3.len(); + let object_2_size = data_2.len(); + let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests()); let cache_key_2 = ObjectId::new( "longkey_".repeat(128), // 1024 bytes, max length for S3 keys @@ -186,7 +222,7 @@ mod tests { ); let block = cache - .get_block(&cache_key_1, 0, 0) + .get_block(&cache_key_1, 0, 0, object_1_size) .await .expect("cache should be accessible"); assert!( @@ -197,11 +233,11 @@ mod tests { // PUT and GET, OK? cache - .put_block(cache_key_1.clone(), 0, 0, data_1.clone()) + .put_block(cache_key_1.clone(), 0, 0, data_1.clone(), object_1_size) .await .expect("cache should be accessible"); let entry = cache - .get_block(&cache_key_1, 0, 0) + .get_block(&cache_key_1, 0, 0, object_1_size) .await .expect("cache should be accessible") .expect("cache entry should be returned"); @@ -212,11 +248,11 @@ mod tests { // PUT AND GET block for a second key, OK? cache - .put_block(cache_key_2.clone(), 0, 0, data_2.clone()) + .put_block(cache_key_2.clone(), 0, 0, data_2.clone(), object_2_size) .await .expect("cache should be accessible"); let entry = cache - .get_block(&cache_key_2, 0, 0) + .get_block(&cache_key_2, 0, 0, object_2_size) .await .expect("cache should be accessible") .expect("cache entry should be returned"); @@ -227,11 +263,11 @@ mod tests { // PUT AND GET a second block in a cache entry, OK? cache - .put_block(cache_key_1.clone(), 1, block_size, data_3.clone()) + .put_block(cache_key_1.clone(), 1, block_size, data_3.clone(), object_1_size) .await .expect("cache should be accessible"); let entry = cache - .get_block(&cache_key_1, 1, block_size) + .get_block(&cache_key_1, 1, block_size, object_1_size) .await .expect("cache should be accessible") .expect("cache entry should be returned"); @@ -242,7 +278,7 @@ mod tests { // Entry 1's first block still intact let entry = cache - .get_block(&cache_key_1, 0, 0) + .get_block(&cache_key_1, 0, 0, object_1_size) .await .expect("cache should be accessible") .expect("cache entry should be returned"); @@ -251,4 +287,32 @@ mod tests { "cache entry returned should match original bytes after put" ); } + + #[tokio::test] + async fn large_object_bypassed() { + let bucket = "test-bucket"; + let config = MockClientConfig { + bucket: bucket.to_string(), + part_size: 8 * 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 8 * 1024 * 1024, + ..Default::default() + }; + let client = Arc::new(MockClient::new(config)); + let cache = ExpressDataCache::new(client.clone(), Default::default(), "unique source description", bucket); + let data_1 = vec![0u8; 1024 * 1024 + 1]; + let data_1 = ChecksummedBytes::new(data_1.into()); + let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests()); + // PUT and GET for a large object should be no-op + cache + .put_block(cache_key_1.clone(), 0, 0, data_1.clone(), data_1.len()) + .await + .expect("cache should be accessible"); + let get_result = cache + .get_block(&cache_key_1, 0, 0, data_1.len()) + .await + .expect("cache should be accessible"); + assert!(get_result.is_none()); + assert_eq!(client.object_count(), 0, "cache must be empty"); + } } diff --git a/mountpoint-s3/src/data_cache/in_memory_data_cache.rs b/mountpoint-s3/src/data_cache/in_memory_data_cache.rs index 18c6335c3..eb921978d 100644 --- a/mountpoint-s3/src/data_cache/in_memory_data_cache.rs +++ b/mountpoint-s3/src/data_cache/in_memory_data_cache.rs @@ -38,6 +38,7 @@ impl DataCache for InMemoryDataCache { cache_key: &ObjectId, block_idx: BlockIndex, block_offset: u64, + _object_size: usize, ) -> DataCacheResult> { if block_offset != block_idx * self.block_size { return Err(DataCacheError::InvalidBlockOffset); @@ -53,6 +54,7 @@ impl DataCache for InMemoryDataCache { block_idx: BlockIndex, block_offset: u64, bytes: ChecksummedBytes, + _object_size: usize, ) -> DataCacheResult<()> { if block_offset != block_idx * self.block_size { return Err(DataCacheError::InvalidBlockOffset); @@ -84,12 +86,18 @@ mod tests { let data_3 = Bytes::from_static(b"Baz"); let data_3 = ChecksummedBytes::new(data_3.clone()); + let object_1_size = data_1.len() + data_3.len(); + let object_2_size = data_2.len(); + let block_size = 8 * 1024 * 1024; let cache = InMemoryDataCache::new(block_size); let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests()); let cache_key_2 = ObjectId::new("b".into(), ETag::for_tests()); - let block = cache.get_block(&cache_key_1, 0, 0).await.expect("cache is accessible"); + let block = cache + .get_block(&cache_key_1, 0, 0, object_1_size) + .await + .expect("cache is accessible"); assert!( block.is_none(), "no entry should be available to return but got {:?}", @@ -98,11 +106,11 @@ mod tests { // PUT and GET, OK? cache - .put_block(cache_key_1.clone(), 0, 0, data_1.clone()) + .put_block(cache_key_1.clone(), 0, 0, data_1.clone(), object_1_size) .await .expect("cache is accessible"); let entry = cache - .get_block(&cache_key_1, 0, 0) + .get_block(&cache_key_1, 0, 0, object_1_size) .await .expect("cache is accessible") .expect("cache entry should be returned"); @@ -113,11 +121,11 @@ mod tests { // PUT AND GET a second file, OK? cache - .put_block(cache_key_2.clone(), 0, 0, data_2.clone()) + .put_block(cache_key_2.clone(), 0, 0, data_2.clone(), object_2_size) .await .expect("cache is accessible"); let entry = cache - .get_block(&cache_key_2, 0, 0) + .get_block(&cache_key_2, 0, 0, object_2_size) .await .expect("cache is accessible") .expect("cache entry should be returned"); @@ -128,11 +136,11 @@ mod tests { // PUT AND GET a second block in a cache entry, OK? cache - .put_block(cache_key_1.clone(), 1, block_size, data_3.clone()) + .put_block(cache_key_1.clone(), 1, block_size, data_3.clone(), object_1_size) .await .expect("cache is accessible"); let entry = cache - .get_block(&cache_key_1, 1, block_size) + .get_block(&cache_key_1, 1, block_size, object_1_size) .await .expect("cache is accessible") .expect("cache entry should be returned"); @@ -143,7 +151,7 @@ mod tests { // Entry 1's first block still intact let entry = cache - .get_block(&cache_key_1, 0, 0) + .get_block(&cache_key_1, 0, 0, object_1_size) .await .expect("cache is accessible") .expect("cache entry should be returned"); diff --git a/mountpoint-s3/src/data_cache/multilevel_cache.rs b/mountpoint-s3/src/data_cache/multilevel_cache.rs index a745865b8..403b23313 100644 --- a/mountpoint-s3/src/data_cache/multilevel_cache.rs +++ b/mountpoint-s3/src/data_cache/multilevel_cache.rs @@ -47,8 +47,13 @@ where cache_key: &ObjectId, block_idx: BlockIndex, block_offset: u64, + object_size: usize, ) -> DataCacheResult> { - match self.disk_cache.get_block(cache_key, block_idx, block_offset).await { + match self + .disk_cache + .get_block(cache_key, block_idx, block_offset, object_size) + .await + { Ok(Some(data)) => { trace!(cache_key=?cache_key, block_idx=block_idx, "block served from the disk cache"); return DataCacheResult::Ok(Some(data)); @@ -57,7 +62,11 @@ where Err(err) => warn!(cache_key=?cache_key, block_idx=block_idx, ?err, "error reading block from disk cache"), } - if let Some(data) = self.express_cache.get_block(cache_key, block_idx, block_offset).await? { + if let Some(data) = self + .express_cache + .get_block(cache_key, block_idx, block_offset, object_size) + .await? + { trace!(cache_key=?cache_key, block_idx=block_idx, "block served from the express cache"); let cache_key = cache_key.clone(); let disk_cache = self.disk_cache.clone(); @@ -65,7 +74,7 @@ where self.runtime .spawn(async move { if let Err(error) = disk_cache - .put_block(cache_key.clone(), block_idx, block_offset, data_cloned) + .put_block(cache_key.clone(), block_idx, block_offset, data_cloned, object_size) .await { warn!(cache_key=?cache_key, block_idx, ?error, "failed to update the local cache"); @@ -85,17 +94,18 @@ where block_idx: BlockIndex, block_offset: u64, bytes: ChecksummedBytes, + object_size: usize, ) -> DataCacheResult<()> { if let Err(error) = self .disk_cache - .put_block(cache_key.clone(), block_idx, block_offset, bytes.clone()) + .put_block(cache_key.clone(), block_idx, block_offset, bytes.clone(), object_size) .await { warn!(cache_key=?cache_key, block_idx, ?error, "failed to update the local cache"); } self.express_cache - .put_block(cache_key, block_idx, block_offset, bytes) + .put_block(cache_key, block_idx, block_offset, bytes, object_size) .await } @@ -141,10 +151,8 @@ mod tests { ..Default::default() }; let client = MockClient::new(config); - ( - client.clone(), - ExpressDataCache::new(bucket, client, "unique source description", BLOCK_SIZE), - ) + let cache = ExpressDataCache::new(client.clone(), Default::default(), "unique source description", bucket); + (client, cache) } #[test_case(false, true; "get from local")] @@ -158,11 +166,12 @@ mod tests { let cache = MultilevelDataCache::new(disk_cache, express_cache, runtime); let data = ChecksummedBytes::new("Foo".into()); + let object_size = data.len(); let cache_key = ObjectId::new("a".into(), ETag::for_tests()); // put in both caches cache - .put_block(cache_key.clone(), 0, 0, data.clone()) + .put_block(cache_key.clone(), 0, 0, data.clone(), object_size) .await .expect("put should succeed"); @@ -176,7 +185,7 @@ mod tests { // check we can retrieve an entry from one of the caches unless both were cleaned up let entry = cache - .get_block(&cache_key, 0, 0) + .get_block(&cache_key, 0, 0, object_size) .await .expect("cache should be accessible"); @@ -197,9 +206,10 @@ mod tests { let (client, express_cache) = create_express_cache(); let data = ChecksummedBytes::new("Foo".into()); + let object_size = data.len(); let cache_key = ObjectId::new("a".into(), ETag::for_tests()); express_cache - .put_block(cache_key.clone(), 0, 0, data.clone()) + .put_block(cache_key.clone(), 0, 0, data.clone(), object_size) .await .expect("put should succeed"); @@ -208,7 +218,7 @@ mod tests { // get from express, put entry in the local cache let entry = cache - .get_block(&cache_key, 0, 0) + .get_block(&cache_key, 0, 0, object_size) .await .expect("cache should be accessible") .expect("cache entry should be returned"); @@ -224,7 +234,7 @@ mod tests { let mut retries = 10; let entry = loop { let entry = cache - .get_block(&cache_key, 0, 0) + .get_block(&cache_key, 0, 0, object_size) .await .expect("cache should be accessible"); if let Some(entry_data) = entry { @@ -255,16 +265,34 @@ mod tests { let cache_key_in_both = ObjectId::new("key_in_both".into(), ETag::for_tests()); // put a key to local only disk_cache - .put_block(cache_key_in_local.clone(), 0, 0, local_data_1.clone()) + .put_block( + cache_key_in_local.clone(), + 0, + 0, + local_data_1.clone(), + local_data_1.len(), + ) .await .expect("put should succeed"); // put another key to both caches, but store different data in those disk_cache - .put_block(cache_key_in_both.clone(), 0, 0, local_data_2.clone()) + .put_block( + cache_key_in_both.clone(), + 0, + 0, + local_data_2.clone(), + local_data_2.len(), + ) .await .expect("put should succeed"); express_cache - .put_block(cache_key_in_both.clone(), 0, 0, express_data.clone()) + .put_block( + cache_key_in_both.clone(), + 0, + 0, + express_data.clone(), + express_data.len(), + ) .await .expect("put should succeed"); @@ -273,7 +301,7 @@ mod tests { // get data, which is stored in local only let entry = cache - .get_block(&cache_key_in_local, 0, 0) + .get_block(&cache_key_in_local, 0, 0, local_data_1.len()) .await .expect("cache should be accessible") .expect("cache entry should be returned"); @@ -284,7 +312,7 @@ mod tests { // get data, which is stored in both caches and check that local has a priority let entry = cache - .get_block(&cache_key_in_both, 0, 0) + .get_block(&cache_key_in_both, 0, 0, local_data_2.len()) .await .expect("cache should be accessible") .expect("cache entry should be returned"); @@ -300,9 +328,10 @@ mod tests { let (_, express_cache) = create_express_cache(); let data = ChecksummedBytes::new("Foo".into()); + let object_size = data.len(); let cache_key = ObjectId::new("a".into(), ETag::for_tests()); express_cache - .put_block(cache_key.clone(), 0, 0, data.clone()) + .put_block(cache_key.clone(), 0, 0, data.clone(), object_size) .await .expect("put should succeed"); @@ -310,7 +339,7 @@ mod tests { let cache = MultilevelDataCache::new(disk_cache, express_cache, runtime); let entry = cache - .get_block(&cache_key, 0, 0) + .get_block(&cache_key, 0, 0, object_size) .await .expect("cache should be accessible") .expect("cache entry should be returned"); @@ -319,4 +348,33 @@ mod tests { "cache entry returned should match original bytes after put" ); } + + #[tokio::test] + async fn large_object_bypassed() { + let (cache_dir, disk_cache) = create_disk_cache(); + let (client, express_cache) = create_express_cache(); + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let cache = MultilevelDataCache::new(disk_cache, express_cache, runtime); + + let data = vec![0u8; 1024 * 1024 + 1]; + let data = ChecksummedBytes::new(data.into()); + let object_size = data.len(); + let cache_key = ObjectId::new("a".into(), ETag::for_tests()); + + // put in both caches, this must be a no-op for the express cache + cache + .put_block(cache_key.clone(), 0, 0, data.clone(), object_size) + .await + .expect("put should succeed"); + + assert_eq!(client.object_count(), 0, "cache must be empty"); + + // try to get from the cache, assuming it is missing in local + cache_dir.close().expect("should clean up local cache"); + let entry = cache + .get_block(&cache_key, 0, 0, object_size) + .await + .expect("cache should be accessible"); + assert!(entry.is_none(), "cache miss is expected for a large object"); + } } diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs index 7d7d044c8..2eeaba795 100644 --- a/mountpoint-s3/src/prefetch/caching_stream.rs +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -142,7 +142,11 @@ where // already likely negligible. let mut block_offset = block_range.start * block_size; for block_index in block_range.clone() { - match self.cache.get_block(cache_key, block_index, block_offset).await { + match self + .cache + .get_block(cache_key, block_index, block_offset, range.object_size()) + .await + { Ok(Some(block)) => { trace!(?cache_key, ?range, block_index, "cache hit"); // Cache blocks always contain bytes in the request range @@ -227,7 +231,7 @@ where cache: self.cache.clone(), runtime: self.runtime.clone(), }; - part_composer.try_compose_parts(request_stream).await; + part_composer.try_compose_parts(request_stream, range).await; } fn block_indices_for_byte_range(&self, range: &RequestRange) -> Range { @@ -258,8 +262,12 @@ where Cache: DataCache + Send + Sync + 'static, Runtime: Spawn, { - async fn try_compose_parts(&mut self, request_stream: impl Stream>) { - if let Err(e) = self.compose_parts(request_stream).await { + async fn try_compose_parts( + &mut self, + request_stream: impl Stream>, + range: RequestRange, + ) { + if let Err(e) = self.compose_parts(request_stream, range).await { trace!(error=?e, "part stream task failed"); self.part_queue_producer.push(Err(e)); } @@ -269,6 +277,7 @@ where async fn compose_parts( &mut self, request_stream: impl Stream>, + range: RequestRange, ) -> Result<(), PrefetchReadError> { let key = self.cache_key.key(); let block_size = self.cache.block_size(); @@ -321,7 +330,7 @@ where } // We have a full block: write it to the cache, send it to the queue, and flush the buffer. - self.update_cache(buffer, self.block_index, self.block_offset, &self.cache_key); + self.update_cache(buffer, self.block_index, self.block_offset, &self.cache_key, range); self.block_index += 1; self.block_offset += block_size; buffer = ChecksummedBytes::default(); @@ -337,19 +346,26 @@ where "a partial block is only allowed at the end of the object" ); // Write the last block to the cache. - self.update_cache(buffer, self.block_index, self.block_offset, &self.cache_key); + self.update_cache(buffer, self.block_index, self.block_offset, &self.cache_key, range); } Ok(()) } - fn update_cache(&self, block: ChecksummedBytes, block_index: u64, block_offset: u64, object_id: &ObjectId) { + fn update_cache( + &self, + block: ChecksummedBytes, + block_index: u64, + block_offset: u64, + object_id: &ObjectId, + range: RequestRange, + ) { let object_id = object_id.clone(); let cache = self.cache.clone(); self.runtime .spawn(async move { let start = Instant::now(); if let Err(error) = cache - .put_block(object_id.clone(), block_index, block_offset, block) + .put_block(object_id.clone(), block_index, block_offset, block, range.object_size()) .await { warn!(key=?object_id, block_index, ?error, "failed to update cache");