From fb47515589caf99aee23bfb500dd923818aac944 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Thu, 16 Nov 2023 08:44:01 +0000 Subject: [PATCH 1/4] Store block offsets in disk data cache Signed-off-by: Alessandro Passaro --- .../src/data_cache/disk_data_cache.rs | 131 ++++++++++++------ 1 file changed, 85 insertions(+), 46 deletions(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index d100ce8f7..bcd8fe123 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -48,8 +48,10 @@ pub enum CacheLimit { #[derive(Serialize, Deserialize, Debug)] struct DiskBlockHeader { block_idx: BlockIndex, + block_offset: u64, etag: String, s3_key: String, + data_checksum: u32, header_checksum: u32, } @@ -72,37 +74,58 @@ enum DiskBlockAccessError { impl DiskBlockHeader { /// Creates a new [DiskBlockHeader] - pub fn new(block_idx: BlockIndex, etag: String, s3_key: String) -> Self { - let header_checksum = Self::compute_checksum(block_idx, &etag, &s3_key).value(); + pub fn new(block_idx: BlockIndex, block_offset: u64, etag: String, s3_key: String, data_checksum: Crc32c) -> Self { + let data_checksum = data_checksum.value(); + let header_checksum = Self::compute_checksum(block_idx, block_offset, &etag, &s3_key, data_checksum).value(); DiskBlockHeader { block_idx, + block_offset, etag, s3_key, + data_checksum, header_checksum, } } - fn compute_checksum(block_idx: BlockIndex, etag: &str, s3_key: &str) -> Crc32c { + fn compute_checksum( + block_idx: BlockIndex, + block_offset: u64, + etag: &str, + s3_key: &str, + data_checksum: u32, + ) -> Crc32c { let mut hasher = crc32c::Hasher::new(); - hasher.update(block_idx.to_be_bytes().as_ref()); + hasher.update(&block_idx.to_be_bytes()); + hasher.update(&block_offset.to_be_bytes()); hasher.update(etag.as_bytes()); hasher.update(s3_key.as_bytes()); + hasher.update(&data_checksum.to_be_bytes()); hasher.finalize() } - /// Validate the integrity of the contained data. + /// Validate the integrity of the contained data and return the stored data checksum. /// /// Execute this method before acting on the data contained within. - pub fn validate(&self, s3_key: &str, etag: &str, block_idx: BlockIndex) -> Result<(), DiskBlockAccessError> { + pub fn validate( + &self, + s3_key: &str, + etag: &str, + block_idx: BlockIndex, + block_offset: u64, + ) -> Result { let s3_key_match = s3_key == self.s3_key; let etag_match = etag == self.etag; let block_idx_match = block_idx == self.block_idx; + let block_offset_match = block_offset == self.block_offset; - if s3_key_match && etag_match && block_idx_match { - if Self::compute_checksum(block_idx, etag, s3_key).value() != self.header_checksum { + let data_checksum = self.data_checksum; + if s3_key_match && etag_match && block_idx_match && block_offset_match { + if Self::compute_checksum(block_idx, block_offset, etag, s3_key, data_checksum).value() + != self.header_checksum + { Err(DiskBlockAccessError::ChecksumError) } else { - Ok(()) + Ok(Crc32c::new(data_checksum)) } } else { error!( @@ -121,8 +144,6 @@ struct DiskBlock { header: DiskBlockHeader, /// Cached bytes data: Bytes, - /// Checksum over the bytes in `data` - data_checksum: u32, } impl DiskBlock { @@ -133,28 +154,32 @@ impl DiskBlock { fn new( cache_key: CacheKey, block_idx: BlockIndex, + block_offset: u64, bytes: ChecksummedBytes, ) -> Result { let (s3_key, etag) = (cache_key.s3_key, cache_key.etag.into_inner()); - let header = DiskBlockHeader::new(block_idx, etag, s3_key); - let (data, data_checksum) = bytes.into_inner()?; - let data_checksum = data_checksum.value(); + let header = DiskBlockHeader::new(block_idx, block_offset, etag, s3_key, data_checksum); - Ok(DiskBlock { - data, - data_checksum, - header, - }) + Ok(DiskBlock { data, header }) } /// Extract the block data, checking that fields such as S3 key, etc. match what we expect. /// /// Comparing these fields helps ensure we have not corrupted or swapped block data on disk. - fn data(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> Result { - self.header - .validate(cache_key.s3_key.as_str(), cache_key.etag.as_str(), block_idx)?; - let bytes = ChecksummedBytes::new(self.data.clone(), Crc32c::new(self.data_checksum)); + fn data( + &self, + cache_key: &CacheKey, + block_idx: BlockIndex, + block_offset: u64, + ) -> Result { + let data_checksum = self.header.validate( + cache_key.s3_key.as_str(), + cache_key.etag.as_str(), + block_idx, + block_offset, + )?; + let bytes = ChecksummedBytes::new(self.data.clone(), data_checksum); Ok(bytes) } } @@ -186,6 +211,7 @@ impl DiskDataCache { path: impl AsRef, cache_key: &CacheKey, block_idx: BlockIndex, + block_offset: u64, ) -> DataCacheResult> { let mut file = match fs::File::open(path.as_ref()) { Ok(file) => file, @@ -210,11 +236,13 @@ impl DiskDataCache { return Err(DataCacheError::InvalidBlockContent); } }; - let bytes = block.data(cache_key, block_idx).map_err(|err| match err { - DiskBlockAccessError::ChecksumError | DiskBlockAccessError::FieldMismatchError => { - DataCacheError::InvalidBlockContent - } - })?; + let bytes = block + .data(cache_key, block_idx, block_offset) + .map_err(|err| match err { + DiskBlockAccessError::ChecksumError | DiskBlockAccessError::FieldMismatchError => { + DataCacheError::InvalidBlockContent + } + })?; Ok(Some(bytes)) } @@ -293,7 +321,8 @@ impl DataCache for DiskDataCache { fn get_block(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult> { let block_key = DiskBlockKey::new(cache_key, block_idx); let path = self.get_path_for_block_key(&block_key); - match self.read_block(&path, cache_key, block_idx) { + let block_offset = block_idx * self.block_size; + match self.read_block(&path, cache_key, block_idx, block_offset) { Ok(None) => Ok(None), Ok(Some(bytes)) => { if let Some(usage) = &self.usage { @@ -320,7 +349,8 @@ impl DataCache for DiskDataCache { let path = self.get_path_for_block_key(&block_key); trace!(?cache_key, ?path, "new block will be created in disk cache"); - let block = DiskBlock::new(cache_key, block_idx, bytes).map_err(|err| match err { + let block_offset = block_idx * self.block_size; + let block = DiskBlock::new(cache_key, block_idx, block_offset, bytes).map_err(|err| match err { DiskBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent, })?; @@ -443,11 +473,10 @@ mod tests { etag: ETag::for_tests(), s3_key: String::from("hello-world"), }; - let block = DiskBlock::new(cache_key, 100, data).expect("should success as data checksum is valid"); + let block = DiskBlock::new(cache_key, 100, 100 * 10, data).expect("should success as data checksum is valid"); let expected_bytes: Vec = vec![ - 100, 0, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116, 101, 115, 116, 95, 101, 116, 97, 103, 11, 0, 0, 0, - 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114, 108, 100, 144, 51, 75, 183, 3, 0, 0, 0, 0, 0, 0, 0, - 70, 111, 111, 9, 85, 128, 46, + 100, 0, 0, 0, 0, 0, 0, 0, 232, 3, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116, 101, 115, 116, 95, 101, 116, 97, 103, 11, 0, 0, 0, + 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114, 108, 100, 9, 85, 128, 46, 29, 32, 6, 192, 3, 0, 0, 0, 0, 0, 0, 0, 70, 111, 111, ]; let serialized_bytes = bincode::serialize(&block).unwrap(); assert_eq!( @@ -714,49 +743,59 @@ mod tests { etag: ETag::from_str("badetag").unwrap(), }; - let block = DiskBlock::new(cache_key_1.clone(), 0, data_1.clone()).expect("should have no checksum err"); + let block = DiskBlock::new(cache_key_1.clone(), 0, 0, data_1.clone()).expect("should have no checksum err"); block - .data(&cache_key_1, 1) + .data(&cache_key_1, 1, 0) .expect_err("should fail due to incorrect block index"); block - .data(&cache_key_2, 0) + .data(&cache_key_1, 0, 1024) + .expect_err("should fail due to incorrect block offset"); + block + .data(&cache_key_2, 0, 0) .expect_err("should fail due to incorrect s3 key in cache key"); block - .data(&cache_key_3, 0) + .data(&cache_key_3, 0, 0) .expect_err("should fail due to incorrect etag in cache key"); - let unpacked_bytes = block.data(&cache_key_1, 0).expect("should be OK as all fields match"); + let unpacked_bytes = block.data(&cache_key_1, 0, 0).expect("should be OK as all fields match"); assert_eq!(data_1, unpacked_bytes, "data block should return original bytes"); } #[test] fn validate_block_header() { let block_idx = 0; + let block_offset = 0; let etag = ETag::for_tests(); let s3_key = String::from("s3/key"); - let mut header = DiskBlockHeader::new(block_idx, etag.as_str().to_owned(), s3_key.clone()); + let data_checksum = Crc32c::new(42); + let mut header = DiskBlockHeader::new(block_idx, block_offset, etag.as_str().to_owned(), s3_key.clone(), data_checksum); - header - .validate(&s3_key, etag.as_str(), block_idx) + let checksum = header + .validate(&s3_key, etag.as_str(), block_idx, block_offset) .expect("should be OK with valid fields and checksum"); + assert_eq!(data_checksum, checksum); // Bad fields let err = header - .validate("hello", etag.as_str(), block_idx) + .validate("hello", etag.as_str(), block_idx, block_offset) .expect_err("should fail with invalid s3_key"); assert!(matches!(err, DiskBlockAccessError::FieldMismatchError)); let err = header - .validate(&s3_key, "bad etag", block_idx) + .validate(&s3_key, "bad etag", block_idx, block_offset) .expect_err("should fail with invalid etag"); assert!(matches!(err, DiskBlockAccessError::FieldMismatchError)); let err = header - .validate(&s3_key, etag.as_str(), 5) + .validate(&s3_key, etag.as_str(), 5, block_offset) .expect_err("should fail with invalid block idx"); assert!(matches!(err, DiskBlockAccessError::FieldMismatchError)); + let err = header + .validate(&s3_key, etag.as_str(), block_idx, 1024) + .expect_err("should fail with invalid block offset"); + assert!(matches!(err, DiskBlockAccessError::FieldMismatchError)); // Bad checksum header.header_checksum = 23; let err = header - .validate(&s3_key, etag.as_str(), block_idx) + .validate(&s3_key, etag.as_str(), block_idx, block_offset) .expect_err("should fail with invalid checksum"); assert!(matches!(err, DiskBlockAccessError::ChecksumError)); } From b6214f5a01c7bc63c8c10b6d9da3d00cf6a67442 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Fri, 17 Nov 2023 10:09:13 +0000 Subject: [PATCH 2/4] Add the block offset to the methods in data cache Signed-off-by: Alessandro Passaro --- mountpoint-s3/src/data_cache.rs | 17 +++- .../src/data_cache/disk_data_cache.rs | 89 +++++++++++++------ .../src/data_cache/in_memory_data_cache.rs | 42 ++++++--- mountpoint-s3/src/prefetch/caching_stream.rs | 31 ++++--- 4 files changed, 126 insertions(+), 53 deletions(-) diff --git a/mountpoint-s3/src/data_cache.rs b/mountpoint-s3/src/data_cache.rs index 9ffc2e56f..1ff408d88 100644 --- a/mountpoint-s3/src/data_cache.rs +++ b/mountpoint-s3/src/data_cache.rs @@ -31,6 +31,8 @@ pub enum DataCacheError { IoFailure(#[from] std::io::Error), #[error("Block content was not valid/readable")] InvalidBlockContent, + #[error("Block offset does not match block index")] + InvalidBlockOffset, #[error("Error while trying to evict cache content")] EvictionFailure, } @@ -45,10 +47,21 @@ pub trait DataCache { /// Get block of data from the cache for the given [CacheKey] and [BlockIndex], if available. /// /// Operation may fail due to errors, or return [None] if the block was not available in the cache. - fn get_block(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult>; + fn get_block( + &self, + cache_key: &CacheKey, + block_idx: BlockIndex, + block_offset: u64, + ) -> DataCacheResult>; /// Put block of data to the cache for the given [CacheKey] and [BlockIndex]. - fn put_block(&self, cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()>; + fn put_block( + &self, + cache_key: CacheKey, + block_idx: BlockIndex, + block_offset: u64, + bytes: ChecksummedBytes, + ) -> DataCacheResult<()>; /// Returns the block size for the data cache. fn block_size(&self) -> u64; diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index bcd8fe123..b6f0a6270 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -318,7 +318,15 @@ fn hash_cache_key_raw(cache_key: &CacheKey) -> [u8; 32] { } impl DataCache for DiskDataCache { - fn get_block(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult> { + fn get_block( + &self, + cache_key: &CacheKey, + block_idx: BlockIndex, + block_offset: u64, + ) -> DataCacheResult> { + if block_offset != block_idx * self.block_size { + return Err(DataCacheError::InvalidBlockOffset); + } let block_key = DiskBlockKey::new(cache_key, block_idx); let path = self.get_path_for_block_key(&block_key); let block_offset = block_idx * self.block_size; @@ -344,12 +352,21 @@ impl DataCache for DiskDataCache { } } - fn put_block(&self, cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()> { + fn put_block( + &self, + cache_key: CacheKey, + block_idx: BlockIndex, + block_offset: u64, + bytes: ChecksummedBytes, + ) -> DataCacheResult<()> { + if block_offset != block_idx * self.block_size { + return Err(DataCacheError::InvalidBlockOffset); + } + let block_key = DiskBlockKey::new(&cache_key, block_idx); let path = self.get_path_for_block_key(&block_key); trace!(?cache_key, ?path, "new block will be created in disk cache"); - let block_offset = block_idx * self.block_size; let block = DiskBlock::new(cache_key, block_idx, block_offset, bytes).map_err(|err| match err { DiskBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent, })?; @@ -475,8 +492,9 @@ mod tests { }; let block = DiskBlock::new(cache_key, 100, 100 * 10, data).expect("should success as data checksum is valid"); let expected_bytes: Vec = vec![ - 100, 0, 0, 0, 0, 0, 0, 0, 232, 3, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116, 101, 115, 116, 95, 101, 116, 97, 103, 11, 0, 0, 0, - 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114, 108, 100, 9, 85, 128, 46, 29, 32, 6, 192, 3, 0, 0, 0, 0, 0, 0, 0, 70, 111, 111, + 100, 0, 0, 0, 0, 0, 0, 0, 232, 3, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116, 101, 115, 116, 95, 101, + 116, 97, 103, 11, 0, 0, 0, 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114, 108, 100, 9, 85, 128, + 46, 29, 32, 6, 192, 3, 0, 0, 0, 0, 0, 0, 0, 70, 111, 111, ]; let serialized_bytes = bincode::serialize(&block).unwrap(); assert_eq!( @@ -531,8 +549,9 @@ mod tests { let data_2 = ChecksummedBytes::from_bytes("Bar".into()); let data_3 = ChecksummedBytes::from_bytes("Baz".into()); + let block_size = 8 * 1024 * 1024; let cache_directory = tempfile::tempdir().unwrap(); - let cache = DiskDataCache::new(cache_directory.into_path(), 8 * 1024 * 1024, CacheLimit::Unbounded); + let cache = DiskDataCache::new(cache_directory.into_path(), block_size, CacheLimit::Unbounded); let cache_key_1 = CacheKey { s3_key: "a".into(), etag: ETag::for_tests(), @@ -542,7 +561,7 @@ mod tests { etag: ETag::for_tests(), }; - let block = cache.get_block(&cache_key_1, 0).expect("cache should be accessible"); + let block = cache.get_block(&cache_key_1, 0, 0).expect("cache should be accessible"); assert!( block.is_none(), "no entry should be available to return but got {:?}", @@ -551,10 +570,10 @@ mod tests { // PUT and GET, OK? cache - .put_block(cache_key_1.clone(), 0, data_1.clone()) + .put_block(cache_key_1.clone(), 0, 0, data_1.clone()) .expect("cache should be accessible"); let entry = cache - .get_block(&cache_key_1, 0) + .get_block(&cache_key_1, 0, 0) .expect("cache should be accessible") .expect("cache entry should be returned"); assert_eq!( @@ -564,10 +583,10 @@ mod tests { // PUT AND GET a second file, OK? cache - .put_block(cache_key_2.clone(), 0, data_2.clone()) + .put_block(cache_key_2.clone(), 0, 0, data_2.clone()) .expect("cache should be accessible"); let entry = cache - .get_block(&cache_key_2, 0) + .get_block(&cache_key_2, 0, 0) .expect("cache should be accessible") .expect("cache entry should be returned"); assert_eq!( @@ -577,10 +596,10 @@ mod tests { // PUT AND GET a second block in a cache entry, OK? cache - .put_block(cache_key_1.clone(), 1, data_3.clone()) + .put_block(cache_key_1.clone(), 1, block_size, data_3.clone()) .expect("cache should be accessible"); let entry = cache - .get_block(&cache_key_1, 1) + .get_block(&cache_key_1, 1, block_size) .expect("cache should be accessible") .expect("cache entry should be returned"); assert_eq!( @@ -590,7 +609,7 @@ mod tests { // Entry 1's first block still intact let entry = cache - .get_block(&cache_key_1, 0) + .get_block(&cache_key_1, 0, 0) .expect("cache should be accessible") .expect("cache entry should be returned"); assert_eq!( @@ -612,10 +631,10 @@ mod tests { }; cache - .put_block(cache_key.clone(), 0, slice.clone()) + .put_block(cache_key.clone(), 0, 0, slice.clone()) .expect("cache should be accessible"); let entry = cache - .get_block(&cache_key, 0) + .get_block(&cache_key, 0, 0) .expect("cache should be accessible") .expect("cache entry should be returned"); assert_eq!( @@ -627,6 +646,11 @@ mod tests { #[test] fn test_eviction() { + const BLOCK_SIZE: usize = 100 * 1024; + const LARGE_OBJECT_SIZE: usize = 1024 * 1024; + const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2; + const CACHE_LIMIT: usize = LARGE_OBJECT_SIZE; + fn create_random(seed: u64, size: usize) -> ChecksummedBytes { let mut rng = ChaCha20Rng::seed_from_u64(seed); let mut body = vec![0u8; size]; @@ -642,7 +666,7 @@ mod tests { expected_bytes: &ChecksummedBytes, ) -> bool { if let Some(retrieved) = cache - .get_block(cache_key, block_idx) + .get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64) .expect("cache should be accessible") { assert_eq!( @@ -658,11 +682,6 @@ mod tests { } } - const BLOCK_SIZE: usize = 100 * 1024; - const LARGE_OBJECT_SIZE: usize = 1024 * 1024; - const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2; - const CACHE_LIMIT: usize = LARGE_OBJECT_SIZE; - let large_object = create_random(0x12345678, LARGE_OBJECT_SIZE); let large_object_blocks: Vec<_> = (0..large_object.len()) .step_by(BLOCK_SIZE) @@ -693,14 +712,24 @@ mod tests { // Put all of large_object for (block_idx, bytes) in large_object_blocks.iter().enumerate() { cache - .put_block(large_object_key.clone(), block_idx as u64, bytes.clone()) + .put_block( + large_object_key.clone(), + block_idx as u64, + (block_idx * BLOCK_SIZE) as u64, + bytes.clone(), + ) .unwrap(); } // Put all of small_object for (block_idx, bytes) in small_object_blocks.iter().enumerate() { cache - .put_block(small_object_key.clone(), block_idx as u64, bytes.clone()) + .put_block( + small_object_key.clone(), + block_idx as u64, + (block_idx * BLOCK_SIZE) as u64, + bytes.clone(), + ) .unwrap(); } @@ -756,7 +785,9 @@ mod tests { block .data(&cache_key_3, 0, 0) .expect_err("should fail due to incorrect etag in cache key"); - let unpacked_bytes = block.data(&cache_key_1, 0, 0).expect("should be OK as all fields match"); + let unpacked_bytes = block + .data(&cache_key_1, 0, 0) + .expect("should be OK as all fields match"); assert_eq!(data_1, unpacked_bytes, "data block should return original bytes"); } @@ -767,7 +798,13 @@ mod tests { let etag = ETag::for_tests(); let s3_key = String::from("s3/key"); let data_checksum = Crc32c::new(42); - let mut header = DiskBlockHeader::new(block_idx, block_offset, etag.as_str().to_owned(), s3_key.clone(), data_checksum); + let mut header = DiskBlockHeader::new( + block_idx, + block_offset, + etag.as_str().to_owned(), + s3_key.clone(), + data_checksum, + ); let checksum = header .validate(&s3_key, etag.as_str(), block_idx, block_offset) diff --git a/mountpoint-s3/src/data_cache/in_memory_data_cache.rs b/mountpoint-s3/src/data_cache/in_memory_data_cache.rs index eef1872bd..5bc0fe9b1 100644 --- a/mountpoint-s3/src/data_cache/in_memory_data_cache.rs +++ b/mountpoint-s3/src/data_cache/in_memory_data_cache.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use std::default::Default; -use super::{BlockIndex, CacheKey, ChecksummedBytes, DataCache, DataCacheResult}; +use super::{BlockIndex, CacheKey, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult}; use crate::sync::RwLock; /// Simple in-memory (RAM) implementation of [DataCache]. Recommended for use in testing only. @@ -23,13 +23,30 @@ impl InMemoryDataCache { } impl DataCache for InMemoryDataCache { - fn get_block(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult> { + fn get_block( + &self, + cache_key: &CacheKey, + block_idx: BlockIndex, + block_offset: u64, + ) -> DataCacheResult> { + if block_offset != block_idx * self.block_size { + return Err(DataCacheError::InvalidBlockOffset); + } let data = self.data.read().unwrap(); let block_data = data.get(cache_key).and_then(|blocks| blocks.get(&block_idx)).cloned(); Ok(block_data) } - fn put_block(&self, cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()> { + fn put_block( + &self, + cache_key: CacheKey, + block_idx: BlockIndex, + block_offset: u64, + bytes: ChecksummedBytes, + ) -> DataCacheResult<()> { + if block_offset != block_idx * self.block_size { + return Err(DataCacheError::InvalidBlockOffset); + } let mut data = self.data.write().unwrap(); let blocks = data.entry(cache_key).or_default(); blocks.insert(block_idx, bytes); @@ -57,7 +74,8 @@ mod tests { let data_3 = Bytes::from_static(b"Baz"); let data_3 = ChecksummedBytes::from_bytes(data_3.clone()); - let cache = InMemoryDataCache::new(8 * 1024 * 1024); + let block_size = 8 * 1024 * 1024; + let cache = InMemoryDataCache::new(block_size); let cache_key_1 = CacheKey { s3_key: "a".into(), etag: ETag::for_tests(), @@ -67,7 +85,7 @@ mod tests { etag: ETag::for_tests(), }; - let block = cache.get_block(&cache_key_1, 0).expect("cache is accessible"); + let block = cache.get_block(&cache_key_1, 0, 0).expect("cache is accessible"); assert!( block.is_none(), "no entry should be available to return but got {:?}", @@ -76,10 +94,10 @@ mod tests { // PUT and GET, OK? cache - .put_block(cache_key_1.clone(), 0, data_1.clone()) + .put_block(cache_key_1.clone(), 0, 0, data_1.clone()) .expect("cache is accessible"); let entry = cache - .get_block(&cache_key_1, 0) + .get_block(&cache_key_1, 0, 0) .expect("cache is accessible") .expect("cache entry should be returned"); assert_eq!( @@ -89,10 +107,10 @@ mod tests { // PUT AND GET a second file, OK? cache - .put_block(cache_key_2.clone(), 0, data_2.clone()) + .put_block(cache_key_2.clone(), 0, 0, data_2.clone()) .expect("cache is accessible"); let entry = cache - .get_block(&cache_key_2, 0) + .get_block(&cache_key_2, 0, 0) .expect("cache is accessible") .expect("cache entry should be returned"); assert_eq!( @@ -102,10 +120,10 @@ mod tests { // PUT AND GET a second block in a cache entry, OK? cache - .put_block(cache_key_1.clone(), 1, data_3.clone()) + .put_block(cache_key_1.clone(), 1, block_size, data_3.clone()) .expect("cache is accessible"); let entry = cache - .get_block(&cache_key_1, 1) + .get_block(&cache_key_1, 1, block_size) .expect("cache is accessible") .expect("cache entry should be returned"); assert_eq!( @@ -115,7 +133,7 @@ mod tests { // Entry 1's first block still intact let entry = cache - .get_block(&cache_key_1, 0) + .get_block(&cache_key_1, 0, 0) .expect("cache is accessible") .expect("cache entry should be returned"); assert_eq!( diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs index 7de69768c..2ea197f73 100644 --- a/mountpoint-s3/src/prefetch/caching_stream.rs +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -118,13 +118,15 @@ where // We could check for missing blocks in advance and pre-emptively start a GetObject // request, but since this stream is already behind the prefetcher, the delay is // already likely negligible. + let mut block_offset = block_range.start * block_size; for block_index in block_range.clone() { - match self.cache.get_block(&self.cache_key, block_index) { + match self.cache.get_block(&self.cache_key, block_index, block_offset) { Ok(Some(block)) => { trace!(?key, ?range, block_index, "cache hit"); - let part = self.make_part(block, block_index, &range); + let part = self.make_part(block, block_index, block_offset, &range); metrics::counter!("cache.total_bytes", part.len() as u64, "type" => "read"); self.part_queue_producer.push(Ok(part)); + block_offset += block_size; continue; } Ok(None) => trace!(?key, ?range, block_index, "cache miss - no data for block"), @@ -132,7 +134,7 @@ where } // If a block is uncached or reading it fails, fallback to S3 for the rest of the stream. return self - .get_from_client(range.trim_start(block_index * block_size), block_index..block_range.end) + .get_from_client(range.trim_start(block_offset), block_index..block_range.end) .await; } } @@ -212,7 +214,7 @@ where // We have a full block: write it to the cache, send it to the queue, and flush the buffer. self.update_cache(block_index, block_offset, &buffer); self.part_queue_producer - .push(Ok(self.make_part(buffer, block_index, &range))); + .push(Ok(self.make_part(buffer, block_index, block_offset, &range))); block_index += 1; block_offset += block_size; buffer = ChecksummedBytes::default(); @@ -236,7 +238,7 @@ where // Write the last block to the cache. self.update_cache(block_index, block_offset, &buffer); self.part_queue_producer - .push(Ok(self.make_part(buffer, block_index, &range))); + .push(Ok(self.make_part(buffer, block_index, block_offset, &range))); } break; } @@ -248,13 +250,11 @@ where fn update_cache(&self, block_index: u64, block_offset: u64, block: &ChecksummedBytes) { // TODO: consider updating the cache asynchronously let start = Instant::now(); - match self.cache.put_block(self.cache_key.clone(), block_index, block.clone()) { + match self + .cache + .put_block(self.cache_key.clone(), block_index, block_offset, block.clone()) + { Ok(()) => { - assert_eq!( - block_offset, - block_index * self.cache.block_size(), - "invalid block offset" - ); metrics::histogram!("cache.write_duration_us", start.elapsed().as_micros() as f64); metrics::counter!("cache.total_bytes", block.len() as u64, "type" => "write"); } @@ -266,9 +266,14 @@ where /// Creates a Part that can be streamed to the prefetcher from the given cache block. /// If required, trims the block bytes to the request range. - fn make_part(&self, block: ChecksummedBytes, block_index: u64, range: &RequestRange) -> Part { + fn make_part(&self, block: ChecksummedBytes, block_index: u64, block_offset: u64, range: &RequestRange) -> Part { + assert_eq!( + block_offset, + block_index * self.cache.block_size(), + "invalid block offset" + ); + let key = &self.cache_key.s3_key; - let block_offset = block_index * self.cache.block_size(); let block_size = block.len(); let part_range = range .trim_start(block_offset) From 83e977aed5c949f1e4907efc80d9ad2bfa029781 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Fri, 17 Nov 2023 14:47:56 +0000 Subject: [PATCH 3/4] Fix typo Signed-off-by: Alessandro Passaro --- mountpoint-s3/src/data_cache/disk_data_cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index b6f0a6270..1b66f9524 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -490,7 +490,7 @@ mod tests { etag: ETag::for_tests(), s3_key: String::from("hello-world"), }; - let block = DiskBlock::new(cache_key, 100, 100 * 10, data).expect("should success as data checksum is valid"); + let block = DiskBlock::new(cache_key, 100, 100 * 10, data).expect("should succeed as data checksum is valid"); let expected_bytes: Vec = vec![ 100, 0, 0, 0, 0, 0, 0, 0, 232, 3, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116, 101, 115, 116, 95, 101, 116, 97, 103, 11, 0, 0, 0, 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114, 108, 100, 9, 85, 128, From 97ef8bdd636e36e9f76d93960429f1ec55c96349 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Fri, 17 Nov 2023 14:48:13 +0000 Subject: [PATCH 4/4] Remove redundant variable Signed-off-by: Alessandro Passaro --- mountpoint-s3/src/data_cache/disk_data_cache.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 1b66f9524..e7d37a7f2 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -329,7 +329,6 @@ impl DataCache for DiskDataCache { } let block_key = DiskBlockKey::new(cache_key, block_idx); let path = self.get_path_for_block_key(&block_key); - let block_offset = block_idx * self.block_size; match self.read_block(&path, cache_key, block_idx, block_offset) { Ok(None) => Ok(None), Ok(Some(bytes)) => {