From cb6958ff381a76113dd4997d43931e1f88504ba7 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 26 Oct 2023 14:51:39 +0100 Subject: [PATCH 01/25] Implement disk-based DataCache with no checksums or eviction Signed-off-by: Daniel Carl Jones --- Cargo.lock | 1 + mountpoint-s3/Cargo.toml | 1 + mountpoint-s3/src/checksums.rs | 10 + mountpoint-s3/src/data_cache.rs | 1 + .../src/data_cache/disk_data_cache.rs | 312 ++++++++++++++++++ 5 files changed, 325 insertions(+) create mode 100644 mountpoint-s3/src/data_cache/disk_data_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 82747a844..a19a1c2c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1950,6 +1950,7 @@ dependencies = [ "aws-sdk-s3", "aws-sdk-sts", "base16ct", + "base64ct", "built", "bytes", "clap 4.3.9", diff --git a/mountpoint-s3/Cargo.toml b/mountpoint-s3/Cargo.toml index b1a806419..6b3bd183c 100644 --- a/mountpoint-s3/Cargo.toml +++ b/mountpoint-s3/Cargo.toml @@ -36,6 +36,7 @@ nix = "0.26.2" time = { version = "0.3.17", features = ["macros", "formatting"] } const_format = "0.2.30" serde_json = "1.0.95" +base64ct = { version = "1.6.0", features = ["std"] } [target.'cfg(target_os = "linux")'.dependencies] procfs = { version = "0.15.1", default-features = false } diff --git a/mountpoint-s3/src/checksums.rs b/mountpoint-s3/src/checksums.rs index cdc6310b4..8e97e7fab 100644 --- a/mountpoint-s3/src/checksums.rs +++ b/mountpoint-s3/src/checksums.rs @@ -151,6 +151,16 @@ impl ChecksummedBytes { } Ok(()) } + + /// Provide the underlying bytes and the associated checksum, + /// which may be recalulated if the checksum covers a larger slice than the current slice. + /// Validation may or may not be triggered, and **bytes or checksum may be corrupt** even if result returns [Ok]. + /// + /// If you are only interested in the underlying bytes, **you should use `into_bytes()`**. + pub fn into_inner(self) -> Result<(Bytes, Crc32c), IntegrityError> { + self.shrink_to_fit()?; + Ok((self.curr_slice, self.checksum)) + } } impl Default for ChecksummedBytes { diff --git a/mountpoint-s3/src/data_cache.rs b/mountpoint-s3/src/data_cache.rs index 118193d9d..0c3f4f92d 100644 --- a/mountpoint-s3/src/data_cache.rs +++ b/mountpoint-s3/src/data_cache.rs @@ -4,6 +4,7 @@ //! reducing both the number of requests as well as the latency for the reads. //! Ultimately, this means reduced cost in terms of S3 billing as well as compute time. +pub mod disk_data_cache; pub mod in_memory_data_cache; use std::ops::RangeBounds; diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs new file mode 100644 index 000000000..2ded4889f --- /dev/null +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -0,0 +1,312 @@ +//! Module for the on-disk data cache implementation. + +use std::fs; +use std::io::{ErrorKind, Write}; +use std::ops::RangeBounds; +use std::path::PathBuf; + +use base64ct::{Base64Url, Encoding}; +use bytes::{BufMut, BytesMut}; +use mountpoint_s3_crt::checksums::crc32c; +use tracing::{error, trace, warn}; + +use crate::data_cache::DataCacheError; + +use super::{BlockIndex, CacheKey, ChecksummedBytes, DataCache, DataCacheResult}; + +/// On-disk implementation of [DataCache]. +/// +/// TODO: Store checksums on disk, reconstruct as [ChecksummedBytes] using same checksum avoiding recomputation. +/// +/// TODO: Store additional metadata with each block such as expected S3 key, ETag, etc.. +pub struct DiskDataCache { + block_size: u64, + cache_directory: PathBuf, +} + +impl DiskDataCache { + /// Create a new instance of an [DiskDataCache] with the specified `block_size`. + pub fn new(cache_directory: PathBuf, block_size: u64) -> Self { + DiskDataCache { + block_size, + cache_directory, + } + } + + /// Get the relative path for the given block. + fn get_path_for_key(&self, cache_key: &CacheKey) -> PathBuf { + let mut path = self.cache_directory.join("v1"); + + // An S3 key may be up to 1024 UTF-8 bytes long, which exceeds the maximum UNIX file name length. + // Instead, we encode the key and split into 255 character long directory names. + let encoded_s3_key = Base64Url::encode_string(cache_key.s3_key.as_bytes()); + let mut slice = encoded_s3_key.as_str(); + while !slice.is_empty() { + let (chunk, remaining) = slice.split_at(255.min(slice.len())); + path.push(chunk); + slice = remaining; + } + + path.push(cache_key.etag.as_str()); + path + } + + /// Get path for the given block. + fn get_path_for_block(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> PathBuf { + let mut path = self.get_path_for_key(cache_key); + path.push(format!("{}.block", block_idx)); + path + } +} + +impl DataCache for DiskDataCache { + fn get_block(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult> { + let path = self.get_path_for_block(cache_key, block_idx); + let mut file = match fs::File::open(path) { + Ok(file) => file, + Err(err) if err.kind() == ErrorKind::NotFound => return Ok(None), + Err(err) => return Err(err.into()), + }; + + let bytes = BytesMut::with_capacity(self.block_size as usize); + let mut writer = bytes.writer(); + std::io::copy(&mut file, &mut writer)?; + let bytes = writer.into_inner().freeze(); + + // TODO: Read checksum from block file + let checksum = crc32c::checksum(bytes.as_ref()); + let bytes = ChecksummedBytes::new(bytes, checksum); + + Ok(Some(bytes)) + } + + fn put_block(&self, cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()> { + let path = self.get_path_for_block(&cache_key, block_idx); + fs::create_dir_all(path.parent().expect("path should include cache key in directory name"))?; + let mut file = fs::File::create(path)?; + let (bytes, _checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?; + // TODO: Store checksum + file.write_all(&bytes)?; + file.sync_data()?; + Ok(()) + } + + fn block_size(&self) -> u64 { + self.block_size + } + + fn cached_block_indices>( + &self, + cache_key: &CacheKey, + range: R, + ) -> DataCacheResult> { + let path_for_cache_key = self.get_path_for_key(cache_key); + let read_dir = match fs::read_dir(&path_for_cache_key) { + Ok(handle) => handle, + Err(e) if e.kind() == ErrorKind::NotFound => { + trace!( + ?path_for_cache_key, + "no directory for cache key, assuming no block cached yet" + ); + return Ok(Vec::new()); + } + Err(e) => return Err(e.into()), + }; + + let mut indicies = Vec::new(); + for entry in read_dir.into_iter() { + if let Err(e) = entry { + return Err(DataCacheError::IoFailure(e)); + } + + let file_name = entry?.file_name(); + let file_name = file_name.to_string_lossy(); + let block_suffix = ".block"; + if file_name.ends_with(block_suffix) { + let end = file_name.len() - block_suffix.len(); + let block_idx = &file_name[..end]; + if let Ok(block_idx) = block_idx.parse::() { + if range.contains(&block_idx) { + indicies.push(block_idx); + } + } else { + error!( + path=?path_for_cache_key.join(file_name.as_ref()), + "unexpected file found in cache, name couldn't be parsed for block number" + ); + }; + } else { + warn!( + path=?path_for_cache_key.join(file_name.as_ref()), + "unexpected file found in cache without \".block\" suffix" + ); + } + } + + Ok(indicies) + } +} + +#[cfg(test)] +mod tests { + use std::ffi::OsString; + + use super::*; + + use mountpoint_s3_client::types::ETag; + use test_case::test_case; + + #[test_case("hello"; "simple string")] + #[test_case("foo/bar/baz"; "with forward slashes")] + #[test_case("hello+world"; "with plus char")] + #[test_case("hello\\ world"; "backslash")] + #[test_case("hello=world"; "equals")] + #[test_case("lookšŸŽ”emoji"; "emoji")] + fn test_get_path_for_key(s3_key: &str) { + let cache_dir = PathBuf::from("mountpoint-cache/"); + let data_cache = DiskDataCache::new(cache_dir, 1024); + + let encoded_s3_key = Base64Url::encode_string(s3_key.as_bytes()); + let etag = ETag::for_tests(); + let key = CacheKey { + etag, + s3_key: s3_key.to_owned(), + }; + let expected = vec!["mountpoint-cache", "v1", &encoded_s3_key, key.etag.as_str()]; + let path = data_cache.get_path_for_key(&key); + let results: Vec = path.iter().map(ToOwned::to_owned).collect(); + assert_eq!(expected, results); + } + + #[test] + fn test_get_path_for_key_very_long_key() { + let cache_dir = PathBuf::from("mountpoint-cache/"); + let data_cache = DiskDataCache::new(cache_dir, 1024); + + let s3_key = "a".repeat(266); + + let etag = ETag::for_tests(); + let key = CacheKey { + etag, + s3_key: s3_key.to_owned(), + }; + let expected = vec![ + "mountpoint-cache", + "v1", + "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\ + YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\ + YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWF", + "hYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWE=", + key.etag.as_str(), + ]; + let path = data_cache.get_path_for_key(&key); + let results: Vec = path.iter().map(ToOwned::to_owned).collect(); + assert_eq!(expected, results); + } + + #[test] + fn test_put_get() { + let data_1 = ChecksummedBytes::from_bytes("Foo".into()); + let data_2 = ChecksummedBytes::from_bytes("Bar".into()); + let data_3 = ChecksummedBytes::from_bytes("Baz".into()); + + let cache_directory = tempfile::tempdir().unwrap(); + let cache = DiskDataCache::new(cache_directory.into_path(), 8 * 1024 * 1024); + let cache_key_1 = CacheKey { + s3_key: "a".into(), + etag: ETag::for_tests(), + }; + let cache_key_2 = CacheKey { + s3_key: "long-key_".repeat(100), // at least 900 chars, exceeding easily 255 chars (UNIX filename limit) + etag: ETag::for_tests(), + }; + + let block = cache.get_block(&cache_key_1, 0).expect("cache should be accessible"); + assert!( + block.is_none(), + "no entry should be available to return but got {:?}", + block, + ); + + // PUT and GET, OK? + cache + .put_block(cache_key_1.clone(), 0, data_1.clone()) + .expect("cache should be accessible"); + let entry = cache + .get_block(&cache_key_1, 0) + .expect("cache should be accessible") + .expect("cache entry should be returned"); + assert_eq!( + data_1, entry, + "cache entry returned should match original bytes after put" + ); + + // PUT AND GET a second file, OK? + cache + .put_block(cache_key_2.clone(), 0, data_2.clone()) + .expect("cache should be accessible"); + let entry = cache + .get_block(&cache_key_2, 0) + .expect("cache should be accessible") + .expect("cache entry should be returned"); + assert_eq!( + data_2, entry, + "cache entry returned should match original bytes after put" + ); + + // PUT AND GET a second block in a cache entry, OK? + cache + .put_block(cache_key_1.clone(), 1, data_3.clone()) + .expect("cache should be accessible"); + let entry = cache + .get_block(&cache_key_1, 1) + .expect("cache should be accessible") + .expect("cache entry should be returned"); + assert_eq!( + data_3, entry, + "cache entry returned should match original bytes after put" + ); + + // Entry 1's first block still intact + let entry = cache + .get_block(&cache_key_1, 0) + .expect("cache should be accessible") + .expect("cache entry should be returned"); + assert_eq!( + data_1, entry, + "cache entry returned should match original bytes after put" + ); + } + + #[test] + fn test_cached_indices() { + let temp_dir = tempfile::tempdir().unwrap(); + let block_size = 1024; + let data_cache = DiskDataCache::new(temp_dir.into_path(), block_size); + + let cache_key = CacheKey { + s3_key: "HelloWorld".into(), + etag: ETag::for_tests(), + }; + let bytes = ChecksummedBytes::from_bytes("Hello World".into()); + + let cached_block_indices = data_cache.cached_block_indices(&cache_key, 0..).unwrap(); + assert!(cached_block_indices.is_empty()); + + data_cache.put_block(cache_key.clone(), 5, bytes.clone()).unwrap(); + let cached_block_indices = data_cache.cached_block_indices(&cache_key, 0..).unwrap(); + assert_eq!(cached_block_indices, vec![5]); + let cached_block_indices = data_cache.cached_block_indices(&cache_key, 12..).unwrap(); + assert!(cached_block_indices.is_empty()); + + let another_cache_key = CacheKey { + s3_key: "SomeOtherKey".into(), + etag: ETag::for_tests(), + }; + data_cache.put_block(another_cache_key, 5, bytes.clone()).unwrap(); + let cached_block_indices = data_cache.cached_block_indices(&cache_key, 0..).unwrap(); + assert_eq!(cached_block_indices, vec![5]); + + // TODO: Tests for `remove_block` + } +} From 8eb82b05175d3d1871a97e5339acd78ec17f3a3d Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Wed, 1 Nov 2023 18:07:03 +0000 Subject: [PATCH 02/25] Fix typos Signed-off-by: Daniel Carl Jones --- mountpoint-s3/src/checksums.rs | 2 +- mountpoint-s3/src/data_cache/disk_data_cache.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mountpoint-s3/src/checksums.rs b/mountpoint-s3/src/checksums.rs index 8e97e7fab..70f2547ac 100644 --- a/mountpoint-s3/src/checksums.rs +++ b/mountpoint-s3/src/checksums.rs @@ -153,7 +153,7 @@ impl ChecksummedBytes { } /// Provide the underlying bytes and the associated checksum, - /// which may be recalulated if the checksum covers a larger slice than the current slice. + /// which may be recalculated if the checksum covers a larger slice than the current slice. /// Validation may or may not be triggered, and **bytes or checksum may be corrupt** even if result returns [Ok]. /// /// If you are only interested in the underlying bytes, **you should use `into_bytes()`**. diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 2ded4889f..e9fede4fd 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -113,7 +113,7 @@ impl DataCache for DiskDataCache { Err(e) => return Err(e.into()), }; - let mut indicies = Vec::new(); + let mut indices = Vec::new(); for entry in read_dir.into_iter() { if let Err(e) = entry { return Err(DataCacheError::IoFailure(e)); @@ -127,7 +127,7 @@ impl DataCache for DiskDataCache { let block_idx = &file_name[..end]; if let Ok(block_idx) = block_idx.parse::() { if range.contains(&block_idx) { - indicies.push(block_idx); + indices.push(block_idx); } } else { error!( @@ -143,7 +143,7 @@ impl DataCache for DiskDataCache { } } - Ok(indicies) + Ok(indices) } } From 55fe5486fda34d38c9f484b520e1c20259b2f72f Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Wed, 1 Nov 2023 18:07:38 +0000 Subject: [PATCH 03/25] Replace Base64URL encoding with Base64URLUnpadded encoding for data cache Signed-off-by: Daniel Carl Jones --- mountpoint-s3/src/data_cache/disk_data_cache.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index e9fede4fd..2ca428a05 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -5,7 +5,7 @@ use std::io::{ErrorKind, Write}; use std::ops::RangeBounds; use std::path::PathBuf; -use base64ct::{Base64Url, Encoding}; +use base64ct::{Base64UrlUnpadded, Encoding}; use bytes::{BufMut, BytesMut}; use mountpoint_s3_crt::checksums::crc32c; use tracing::{error, trace, warn}; @@ -39,7 +39,7 @@ impl DiskDataCache { // An S3 key may be up to 1024 UTF-8 bytes long, which exceeds the maximum UNIX file name length. // Instead, we encode the key and split into 255 character long directory names. - let encoded_s3_key = Base64Url::encode_string(cache_key.s3_key.as_bytes()); + let encoded_s3_key = Base64UrlUnpadded::encode_string(cache_key.s3_key.as_bytes()); let mut slice = encoded_s3_key.as_str(); while !slice.is_empty() { let (chunk, remaining) = slice.split_at(255.min(slice.len())); @@ -166,7 +166,7 @@ mod tests { let cache_dir = PathBuf::from("mountpoint-cache/"); let data_cache = DiskDataCache::new(cache_dir, 1024); - let encoded_s3_key = Base64Url::encode_string(s3_key.as_bytes()); + let encoded_s3_key = Base64UrlUnpadded::encode_string(s3_key.as_bytes()); let etag = ETag::for_tests(); let key = CacheKey { etag, @@ -196,7 +196,7 @@ mod tests { "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\ YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\ YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWF", - "hYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWE=", + "hYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWE", key.etag.as_str(), ]; let path = data_cache.get_path_for_key(&key); From d15596d28fe62fea5b6b9e80b379d18519f736a7 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Wed, 1 Nov 2023 18:08:01 +0000 Subject: [PATCH 04/25] Ensure cached indicies are sorted in DiskDataCache Signed-off-by: Daniel Carl Jones --- mountpoint-s3/src/data_cache/disk_data_cache.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 2ca428a05..e0370bdbf 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -143,6 +143,8 @@ impl DataCache for DiskDataCache { } } + indices.sort(); + Ok(indices) } } From ad08838341f3c7ae254adcc1839fcf3bf03a4348 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Wed, 1 Nov 2023 18:37:48 +0000 Subject: [PATCH 05/25] Add trace message when creating block in cache Signed-off-by: Daniel Carl Jones --- mountpoint-s3/src/data_cache/disk_data_cache.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index e0370bdbf..a28d3b6ef 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -82,6 +82,7 @@ impl DataCache for DiskDataCache { fn put_block(&self, cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()> { let path = self.get_path_for_block(&cache_key, block_idx); + trace!(?cache_key, ?path, "new block will be created in data cache"); fs::create_dir_all(path.parent().expect("path should include cache key in directory name"))?; let mut file = fs::File::create(path)?; let (bytes, _checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?; From 6e79390e519875d19c8b7d3f5b40e86ee6c12f9f Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 26 Oct 2023 18:24:49 +0100 Subject: [PATCH 06/25] WIP: Add checksums to on-disk cache Signed-off-by: Daniel Carl Jones --- Cargo.lock | 13 +++-- mountpoint-s3/Cargo.toml | 4 +- .../src/data_cache/disk_data_cache.rs | 51 ++++++++++++----- mountpoint-s3/src/lib.rs | 1 + mountpoint-s3/src/serde.rs | 56 +++++++++++++++++++ 5 files changed, 107 insertions(+), 18 deletions(-) create mode 100644 mountpoint-s3/src/serde.rs diff --git a/Cargo.lock b/Cargo.lock index a19a1c2c1..00f2fc57c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -747,6 +747,9 @@ name = "bytes" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +dependencies = [ + "serde", +] [[package]] name = "bytes-utils" @@ -1951,6 +1954,7 @@ dependencies = [ "aws-sdk-sts", "base16ct", "base64ct", + "bincode", "built", "bytes", "clap 4.3.9", @@ -1977,6 +1981,7 @@ dependencies = [ "rand", "rand_chacha", "regex", + "serde", "serde_json", "sha2", "shuttle", @@ -2813,18 +2818,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.180" +version = "1.0.190" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea67f183f058fe88a4e3ec6e2788e003840893b91bac4559cabedd00863b3ed" +checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.180" +version = "1.0.190" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24e744d7782b686ab3b73267ef05697159cc0e5abbed3f47f9933165e5219036" +checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3" dependencies = [ "proc-macro2 1.0.63", "quote 1.0.29", diff --git a/mountpoint-s3/Cargo.toml b/mountpoint-s3/Cargo.toml index 6b3bd183c..b87b22792 100644 --- a/mountpoint-s3/Cargo.toml +++ b/mountpoint-s3/Cargo.toml @@ -14,7 +14,7 @@ anyhow = { version = "1.0.64", features = ["backtrace"] } async-channel = "1.8.0" async-lock = "2.6.0" async-trait = "0.1.57" -bytes = "1.2.1" +bytes = { version = "1.2.1", features = ["serde"] } clap = { version = "4.1.9", features = ["derive"] } crc32c = "0.6.3" ctrlc = { version = "3.2.3", features = ["termination"] } @@ -37,6 +37,8 @@ time = { version = "0.3.17", features = ["macros", "formatting"] } const_format = "0.2.30" serde_json = "1.0.95" base64ct = { version = "1.6.0", features = ["std"] } +serde = { version = "1.0.190", features = ["derive"] } +bincode = "1.3.3" [target.'cfg(target_os = "linux")'.dependencies] procfs = { version = "0.15.1", default-features = false } diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index a28d3b6ef..9c68f0988 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -6,24 +6,45 @@ use std::ops::RangeBounds; use std::path::PathBuf; use base64ct::{Base64UrlUnpadded, Encoding}; -use bytes::{BufMut, BytesMut}; -use mountpoint_s3_crt::checksums::crc32c; +use bytes::{BufMut, Bytes, BytesMut}; +use serde::{Deserialize, Serialize}; use tracing::{error, trace, warn}; use crate::data_cache::DataCacheError; +use crate::serde::SerializableCrc32c; use super::{BlockIndex, CacheKey, ChecksummedBytes, DataCache, DataCacheResult}; /// On-disk implementation of [DataCache]. /// -/// TODO: Store checksums on disk, reconstruct as [ChecksummedBytes] using same checksum avoiding recomputation. -/// /// TODO: Store additional metadata with each block such as expected S3 key, ETag, etc.. pub struct DiskDataCache { block_size: u64, cache_directory: PathBuf, } +/// Represents a fixed-size chunk of data that can be serialized. +#[derive(Serialize, Deserialize, Debug)] +pub struct DataBlock { + checksum: SerializableCrc32c, + data: Bytes, +} + +impl DataBlock { + fn new(bytes: ChecksummedBytes) -> Self { + let (data, checksum) = bytes + .into_inner() + .expect("TODO: what to do if there's an integrity issue"); + let checksum: SerializableCrc32c = checksum.into(); + DataBlock { checksum, data } + } + + /// TODO: Replace with unpack method taking anything we need for validation? + fn data(&self) -> ChecksummedBytes { + ChecksummedBytes::new(self.data.clone(), self.checksum.into()) + } +} + impl DiskDataCache { /// Create a new instance of an [DiskDataCache] with the specified `block_size`. pub fn new(cache_directory: PathBuf, block_size: u64) -> Self { @@ -68,16 +89,20 @@ impl DataCache for DiskDataCache { Err(err) => return Err(err.into()), }; - let bytes = BytesMut::with_capacity(self.block_size as usize); + let bytes = BytesMut::with_capacity(self.block_size as usize); // TODO: fix capacity? let mut writer = bytes.writer(); std::io::copy(&mut file, &mut writer)?; - let bytes = writer.into_inner().freeze(); + let encoded = writer.into_inner().freeze(); - // TODO: Read checksum from block file - let checksum = crc32c::checksum(bytes.as_ref()); - let bytes = ChecksummedBytes::new(bytes, checksum); + let block: DataBlock = match bincode::deserialize(&encoded[..]) { + Ok(block) => block, + Err(e) => { + error!("block could not be deserialized: {:?}", e); + return Err(DataCacheError::InvalidBlockContent); + } + }; - Ok(Some(bytes)) + Ok(Some(block.data())) } fn put_block(&self, cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()> { @@ -85,9 +110,9 @@ impl DataCache for DiskDataCache { trace!(?cache_key, ?path, "new block will be created in data cache"); fs::create_dir_all(path.parent().expect("path should include cache key in directory name"))?; let mut file = fs::File::create(path)?; - let (bytes, _checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?; - // TODO: Store checksum - file.write_all(&bytes)?; + let block = DataBlock::new(bytes); + let encoded: Vec = bincode::serialize(&block).expect("todo: why do i expect this to work?"); + file.write_all(&encoded)?; file.sync_data()?; Ok(()) } diff --git a/mountpoint-s3/src/lib.rs b/mountpoint-s3/src/lib.rs index a17cb14b3..f545f062a 100644 --- a/mountpoint-s3/src/lib.rs +++ b/mountpoint-s3/src/lib.rs @@ -8,6 +8,7 @@ pub mod logging; pub mod metrics; pub mod prefetch; pub mod prefix; +mod serde; mod sync; mod upload; diff --git a/mountpoint-s3/src/serde.rs b/mountpoint-s3/src/serde.rs new file mode 100644 index 000000000..66ee0f7be --- /dev/null +++ b/mountpoint-s3/src/serde.rs @@ -0,0 +1,56 @@ +//! Serializers for types we don't own and don't already have a Serialize implementation. +//! +//! The way we get around this is by implementing a wrapping type +//! and using this in structures where we need to serialize/deserialize. + +use std::ops::Deref; + +use mountpoint_s3_crt::checksums::crc32c::Crc32c; +use serde::{Deserialize, Deserializer, Serialize}; + +/// Serializable wrapper for [Crc32c]. +/// +/// [Deref] is implemented to allow most access to the underlying type to be seamless, +/// and [From] is implemented for easy wrapping using `.into()`. +/// +/// [Crc32c] is owned by mountpoint-s3 (under the `mountpoint-s3-crt` crate). +/// An alternative to this wrapping type would be to +/// implement [Serialize]/[Deserialize] directly in that crate under a feature flag. +#[derive(Clone, Copy, Debug)] +pub struct SerializableCrc32c(Crc32c); + +impl Deref for SerializableCrc32c { + type Target = Crc32c; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From for SerializableCrc32c { + fn from(value: Crc32c) -> Self { + SerializableCrc32c(value) + } +} + +impl From for Crc32c { + fn from(value: SerializableCrc32c) -> Self { + value.0 + } +} + +impl Serialize for SerializableCrc32c { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_u32(self.value()) + } +} + +impl<'de> Deserialize<'de> for SerializableCrc32c { + fn deserialize>(d: D) -> Result { + let checksum = u32::deserialize(d)?; + let checksum = Crc32c::new(checksum); + Ok(checksum.into()) + } +} From bebe52ab1ba65b83a063c555cb8cb10215daa87c Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 2 Nov 2023 15:46:19 +0000 Subject: [PATCH 07/25] Remove cached_block_indices implementation on DiskDataCache Signed-off-by: Daniel Carl Jones --- .../src/data_cache/disk_data_cache.rs | 85 +------------------ 1 file changed, 4 insertions(+), 81 deletions(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 9c68f0988..6de857949 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -8,7 +8,7 @@ use std::path::PathBuf; use base64ct::{Base64UrlUnpadded, Encoding}; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; -use tracing::{error, trace, warn}; +use tracing::{error, trace}; use crate::data_cache::DataCacheError; use crate::serde::SerializableCrc32c; @@ -123,55 +123,10 @@ impl DataCache for DiskDataCache { fn cached_block_indices>( &self, - cache_key: &CacheKey, - range: R, + _cache_key: &CacheKey, + _range: R, ) -> DataCacheResult> { - let path_for_cache_key = self.get_path_for_key(cache_key); - let read_dir = match fs::read_dir(&path_for_cache_key) { - Ok(handle) => handle, - Err(e) if e.kind() == ErrorKind::NotFound => { - trace!( - ?path_for_cache_key, - "no directory for cache key, assuming no block cached yet" - ); - return Ok(Vec::new()); - } - Err(e) => return Err(e.into()), - }; - - let mut indices = Vec::new(); - for entry in read_dir.into_iter() { - if let Err(e) = entry { - return Err(DataCacheError::IoFailure(e)); - } - - let file_name = entry?.file_name(); - let file_name = file_name.to_string_lossy(); - let block_suffix = ".block"; - if file_name.ends_with(block_suffix) { - let end = file_name.len() - block_suffix.len(); - let block_idx = &file_name[..end]; - if let Ok(block_idx) = block_idx.parse::() { - if range.contains(&block_idx) { - indices.push(block_idx); - } - } else { - error!( - path=?path_for_cache_key.join(file_name.as_ref()), - "unexpected file found in cache, name couldn't be parsed for block number" - ); - }; - } else { - warn!( - path=?path_for_cache_key.join(file_name.as_ref()), - "unexpected file found in cache without \".block\" suffix" - ); - } - } - - indices.sort(); - - Ok(indices) + todo!("implement or deprecate"); } } @@ -305,36 +260,4 @@ mod tests { "cache entry returned should match original bytes after put" ); } - - #[test] - fn test_cached_indices() { - let temp_dir = tempfile::tempdir().unwrap(); - let block_size = 1024; - let data_cache = DiskDataCache::new(temp_dir.into_path(), block_size); - - let cache_key = CacheKey { - s3_key: "HelloWorld".into(), - etag: ETag::for_tests(), - }; - let bytes = ChecksummedBytes::from_bytes("Hello World".into()); - - let cached_block_indices = data_cache.cached_block_indices(&cache_key, 0..).unwrap(); - assert!(cached_block_indices.is_empty()); - - data_cache.put_block(cache_key.clone(), 5, bytes.clone()).unwrap(); - let cached_block_indices = data_cache.cached_block_indices(&cache_key, 0..).unwrap(); - assert_eq!(cached_block_indices, vec![5]); - let cached_block_indices = data_cache.cached_block_indices(&cache_key, 12..).unwrap(); - assert!(cached_block_indices.is_empty()); - - let another_cache_key = CacheKey { - s3_key: "SomeOtherKey".into(), - etag: ETag::for_tests(), - }; - data_cache.put_block(another_cache_key, 5, bytes.clone()).unwrap(); - let cached_block_indices = data_cache.cached_block_indices(&cache_key, 0..).unwrap(); - assert_eq!(cached_block_indices, vec![5]); - - // TODO: Tests for `remove_block` - } } From c153ac75323c73da7c18aecf9f708f9e53ef1dd0 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 2 Nov 2023 16:14:19 +0000 Subject: [PATCH 08/25] Move version identifier to constant Signed-off-by: Daniel Carl Jones --- mountpoint-s3/src/data_cache/disk_data_cache.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 6de857949..abb651fae 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -15,6 +15,9 @@ use crate::serde::SerializableCrc32c; use super::{BlockIndex, CacheKey, ChecksummedBytes, DataCache, DataCacheResult}; +/// Disk and file-layout versioning. +const CACHE_VERSION: &str = "V1"; + /// On-disk implementation of [DataCache]. /// /// TODO: Store additional metadata with each block such as expected S3 key, ETag, etc.. @@ -56,7 +59,7 @@ impl DiskDataCache { /// Get the relative path for the given block. fn get_path_for_key(&self, cache_key: &CacheKey) -> PathBuf { - let mut path = self.cache_directory.join("v1"); + let mut path = self.cache_directory.join(CACHE_VERSION); // An S3 key may be up to 1024 UTF-8 bytes long, which exceeds the maximum UNIX file name length. // Instead, we encode the key and split into 255 character long directory names. @@ -155,7 +158,7 @@ mod tests { etag, s3_key: s3_key.to_owned(), }; - let expected = vec!["mountpoint-cache", "v1", &encoded_s3_key, key.etag.as_str()]; + let expected = vec!["mountpoint-cache", CACHE_VERSION, &encoded_s3_key, key.etag.as_str()]; let path = data_cache.get_path_for_key(&key); let results: Vec = path.iter().map(ToOwned::to_owned).collect(); assert_eq!(expected, results); @@ -175,7 +178,7 @@ mod tests { }; let expected = vec![ "mountpoint-cache", - "v1", + CACHE_VERSION, "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\ YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\ YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWF", From 55524b2312677cc24309b39e4598b6996d1c5550 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 2 Nov 2023 16:19:26 +0000 Subject: [PATCH 09/25] Replace SerializableCrc32c with u32 Signed-off-by: Daniel Carl Jones --- .../src/data_cache/disk_data_cache.rs | 8 +-- mountpoint-s3/src/lib.rs | 1 - mountpoint-s3/src/serde.rs | 56 ------------------- 3 files changed, 4 insertions(+), 61 deletions(-) delete mode 100644 mountpoint-s3/src/serde.rs diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index abb651fae..6d6d0f9c3 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -7,11 +7,11 @@ use std::path::PathBuf; use base64ct::{Base64UrlUnpadded, Encoding}; use bytes::{BufMut, Bytes, BytesMut}; +use mountpoint_s3_crt::checksums::crc32c::Crc32c; use serde::{Deserialize, Serialize}; use tracing::{error, trace}; use crate::data_cache::DataCacheError; -use crate::serde::SerializableCrc32c; use super::{BlockIndex, CacheKey, ChecksummedBytes, DataCache, DataCacheResult}; @@ -29,7 +29,7 @@ pub struct DiskDataCache { /// Represents a fixed-size chunk of data that can be serialized. #[derive(Serialize, Deserialize, Debug)] pub struct DataBlock { - checksum: SerializableCrc32c, + checksum: u32, data: Bytes, } @@ -38,13 +38,13 @@ impl DataBlock { let (data, checksum) = bytes .into_inner() .expect("TODO: what to do if there's an integrity issue"); - let checksum: SerializableCrc32c = checksum.into(); + let checksum = checksum.value(); DataBlock { checksum, data } } /// TODO: Replace with unpack method taking anything we need for validation? fn data(&self) -> ChecksummedBytes { - ChecksummedBytes::new(self.data.clone(), self.checksum.into()) + ChecksummedBytes::new(self.data.clone(), Crc32c::new(self.checksum)) } } diff --git a/mountpoint-s3/src/lib.rs b/mountpoint-s3/src/lib.rs index f545f062a..a17cb14b3 100644 --- a/mountpoint-s3/src/lib.rs +++ b/mountpoint-s3/src/lib.rs @@ -8,7 +8,6 @@ pub mod logging; pub mod metrics; pub mod prefetch; pub mod prefix; -mod serde; mod sync; mod upload; diff --git a/mountpoint-s3/src/serde.rs b/mountpoint-s3/src/serde.rs deleted file mode 100644 index 66ee0f7be..000000000 --- a/mountpoint-s3/src/serde.rs +++ /dev/null @@ -1,56 +0,0 @@ -//! Serializers for types we don't own and don't already have a Serialize implementation. -//! -//! The way we get around this is by implementing a wrapping type -//! and using this in structures where we need to serialize/deserialize. - -use std::ops::Deref; - -use mountpoint_s3_crt::checksums::crc32c::Crc32c; -use serde::{Deserialize, Deserializer, Serialize}; - -/// Serializable wrapper for [Crc32c]. -/// -/// [Deref] is implemented to allow most access to the underlying type to be seamless, -/// and [From] is implemented for easy wrapping using `.into()`. -/// -/// [Crc32c] is owned by mountpoint-s3 (under the `mountpoint-s3-crt` crate). -/// An alternative to this wrapping type would be to -/// implement [Serialize]/[Deserialize] directly in that crate under a feature flag. -#[derive(Clone, Copy, Debug)] -pub struct SerializableCrc32c(Crc32c); - -impl Deref for SerializableCrc32c { - type Target = Crc32c; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl From for SerializableCrc32c { - fn from(value: Crc32c) -> Self { - SerializableCrc32c(value) - } -} - -impl From for Crc32c { - fn from(value: SerializableCrc32c) -> Self { - value.0 - } -} - -impl Serialize for SerializableCrc32c { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - serializer.serialize_u32(self.value()) - } -} - -impl<'de> Deserialize<'de> for SerializableCrc32c { - fn deserialize>(d: D) -> Result { - let checksum = u32::deserialize(d)?; - let checksum = Crc32c::new(checksum); - Ok(checksum.into()) - } -} From aa12176047abe4ec3071646a657d80957590a74a Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 2 Nov 2023 16:23:40 +0000 Subject: [PATCH 10/25] Update DataBlock::new(..) to return Result Signed-off-by: Daniel Carl Jones --- mountpoint-s3/src/data_cache/disk_data_cache.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 6d6d0f9c3..ff8445201 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -34,10 +34,8 @@ pub struct DataBlock { } impl DataBlock { - fn new(bytes: ChecksummedBytes) -> Self { - let (data, checksum) = bytes - .into_inner() - .expect("TODO: what to do if there's an integrity issue"); + fn new(bytes: ChecksummedBytes) -> DataCacheResult { + let (data, checksum) = bytes.into_inner().map_err(|_e| DataCacheError::InvalidBlockContent)?; let checksum = checksum.value(); DataBlock { checksum, data } } From 524d865da98e9ee5bb2d98cd77b8614c4b329b90 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 2 Nov 2023 16:56:26 +0000 Subject: [PATCH 11/25] Add verification of block metadata to unpack after reading Signed-off-by: Daniel Carl Jones --- mountpoint-s3-client/src/object_client.rs | 5 ++ .../src/data_cache/disk_data_cache.rs | 83 ++++++++++++++++--- 2 files changed, 77 insertions(+), 11 deletions(-) diff --git a/mountpoint-s3-client/src/object_client.rs b/mountpoint-s3-client/src/object_client.rs index 77f45d86a..5e58aef7b 100644 --- a/mountpoint-s3-client/src/object_client.rs +++ b/mountpoint-s3-client/src/object_client.rs @@ -31,6 +31,11 @@ impl ETag { &self.etag } + /// Unpack the [String] contained by the enum. + pub fn into_inner(self) -> String { + self.etag + } + /// Creating default etag for tests #[doc(hidden)] pub fn for_tests() -> Self { diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index ff8445201..4a934a8ce 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -19,30 +19,56 @@ use super::{BlockIndex, CacheKey, ChecksummedBytes, DataCache, DataCacheResult}; const CACHE_VERSION: &str = "V1"; /// On-disk implementation of [DataCache]. -/// -/// TODO: Store additional metadata with each block such as expected S3 key, ETag, etc.. pub struct DiskDataCache { block_size: u64, cache_directory: PathBuf, } /// Represents a fixed-size chunk of data that can be serialized. +/// +/// TODO: Add checksum over struct (excl. `data`) to verify block metadata later. #[derive(Serialize, Deserialize, Debug)] pub struct DataBlock { + block_idx: BlockIndex, checksum: u32, data: Bytes, + etag: String, + s3_key: String, } impl DataBlock { - fn new(bytes: ChecksummedBytes) -> DataCacheResult { + fn new(cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult { + let (s3_key, etag) = (cache_key.s3_key, cache_key.etag.into_inner()); let (data, checksum) = bytes.into_inner().map_err(|_e| DataCacheError::InvalidBlockContent)?; let checksum = checksum.value(); - DataBlock { checksum, data } + Ok(DataBlock { + block_idx, + checksum, + data, + etag, + s3_key, + }) } - /// TODO: Replace with unpack method taking anything we need for validation? - fn data(&self) -> ChecksummedBytes { - ChecksummedBytes::new(self.data.clone(), Crc32c::new(self.checksum)) + /// Extra the block data, checking that fields such as S3 key, etc. match what we expect. + /// + /// Comparing these fields helps ensure we have not corrupted or swapped block data on disk. + fn data(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult { + let (s3_key, etag) = (cache_key.s3_key.as_str(), &cache_key.etag); + + let s3_key_match = s3_key == self.s3_key; + let etag_match = etag.as_str() == self.etag; + let block_idx_match = block_idx == self.block_idx; + if s3_key_match && etag_match && block_idx_match { + let bytes = ChecksummedBytes::new(self.data.clone(), Crc32c::new(self.checksum)); + Ok(bytes) + } else { + error!( + s3_key_match, + etag_match, block_idx_match, "block data did not match expected values", + ); + Err(DataCacheError::InvalidBlockContent) + } } } @@ -102,16 +128,19 @@ impl DataCache for DiskDataCache { return Err(DataCacheError::InvalidBlockContent); } }; - - Ok(Some(block.data())) + let bytes = block.data(cache_key, block_idx)?; + Ok(Some(bytes)) } fn put_block(&self, cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()> { let path = self.get_path_for_block(&cache_key, block_idx); trace!(?cache_key, ?path, "new block will be created in data cache"); - fs::create_dir_all(path.parent().expect("path should include cache key in directory name"))?; + let cache_path_for_key = path.parent().expect("path should include cache key in directory name"); + fs::create_dir_all(cache_path_for_key)?; + + let block = DataBlock::new(cache_key, block_idx, bytes)?; + let mut file = fs::File::create(path)?; - let block = DataBlock::new(bytes); let encoded: Vec = bincode::serialize(&block).expect("todo: why do i expect this to work?"); file.write_all(&encoded)?; file.sync_data()?; @@ -134,6 +163,7 @@ impl DataCache for DiskDataCache { #[cfg(test)] mod tests { use std::ffi::OsString; + use std::str::FromStr; use super::*; @@ -261,4 +291,35 @@ mod tests { "cache entry returned should match original bytes after put" ); } + + #[test] + fn data_block_extract_checks() { + let data_1 = ChecksummedBytes::from_bytes("Foo".into()); + + let cache_key_1 = CacheKey { + s3_key: "a".into(), + etag: ETag::for_tests(), + }; + let cache_key_2 = CacheKey { + s3_key: "b".into(), + etag: ETag::for_tests(), + }; + let cache_key_3 = CacheKey { + s3_key: "a".into(), + etag: ETag::from_str("badetag").unwrap(), + }; + + let block = DataBlock::new(cache_key_1.clone(), 0, data_1.clone()).expect("should have no checksum err"); + block + .data(&cache_key_1, 1) + .expect_err("should fail due to incorrect block index"); + block + .data(&cache_key_2, 0) + .expect_err("should fail due to incorrect s3 key in cache key"); + block + .data(&cache_key_3, 0) + .expect_err("should fail due to incorrect etag in cache key"); + let unpacked_bytes = block.data(&cache_key_1, 0).expect("should be OK as all fields match"); + assert_eq!(data_1, unpacked_bytes, "data block should return original bytes"); + } } From 9e8b40788cdbc487f489325d85b24b23fb6ff6ee Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 2 Nov 2023 17:39:37 +0000 Subject: [PATCH 12/25] Replace Base64 encoding with SHA256 hash Signed-off-by: Daniel Carl Jones --- Cargo.lock | 2 +- mountpoint-s3/Cargo.toml | 3 ++- .../src/data_cache/disk_data_cache.rs | 22 ++++++------------- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 00f2fc57c..b917a27d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1953,7 +1953,6 @@ dependencies = [ "aws-sdk-s3", "aws-sdk-sts", "base16ct", - "base64ct", "bincode", "built", "bytes", @@ -1967,6 +1966,7 @@ dependencies = [ "fuser", "futures", "hdrhistogram", + "hex", "lazy_static", "libc", "metrics", diff --git a/mountpoint-s3/Cargo.toml b/mountpoint-s3/Cargo.toml index b87b22792..b8b8c8722 100644 --- a/mountpoint-s3/Cargo.toml +++ b/mountpoint-s3/Cargo.toml @@ -36,9 +36,10 @@ nix = "0.26.2" time = { version = "0.3.17", features = ["macros", "formatting"] } const_format = "0.2.30" serde_json = "1.0.95" -base64ct = { version = "1.6.0", features = ["std"] } serde = { version = "1.0.190", features = ["derive"] } bincode = "1.3.3" +sha2 = "0.10.6" +hex = "0.4.3" [target.'cfg(target_os = "linux")'.dependencies] procfs = { version = "0.15.1", default-features = false } diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 4a934a8ce..98fb1d8f6 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -5,10 +5,10 @@ use std::io::{ErrorKind, Write}; use std::ops::RangeBounds; use std::path::PathBuf; -use base64ct::{Base64UrlUnpadded, Encoding}; use bytes::{BufMut, Bytes, BytesMut}; use mountpoint_s3_crt::checksums::crc32c::Crc32c; use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; use tracing::{error, trace}; use crate::data_cache::DataCacheError; @@ -86,15 +86,10 @@ impl DiskDataCache { let mut path = self.cache_directory.join(CACHE_VERSION); // An S3 key may be up to 1024 UTF-8 bytes long, which exceeds the maximum UNIX file name length. - // Instead, we encode the key and split into 255 character long directory names. - let encoded_s3_key = Base64UrlUnpadded::encode_string(cache_key.s3_key.as_bytes()); - let mut slice = encoded_s3_key.as_str(); - while !slice.is_empty() { - let (chunk, remaining) = slice.split_at(255.min(slice.len())); - path.push(chunk); - slice = remaining; - } - + // Instead, we hash the key. + // The risk of collisions is mitigated as we will ignore blocks read that contain the wrong S3 key, etc.. + let encoded_s3_key = hex::encode(Sha256::digest(cache_key.s3_key.as_bytes())); + path.push(encoded_s3_key); path.push(cache_key.etag.as_str()); path } @@ -180,7 +175,7 @@ mod tests { let cache_dir = PathBuf::from("mountpoint-cache/"); let data_cache = DiskDataCache::new(cache_dir, 1024); - let encoded_s3_key = Base64UrlUnpadded::encode_string(s3_key.as_bytes()); + let encoded_s3_key = hex::encode(Sha256::digest(s3_key.as_bytes())); let etag = ETag::for_tests(); let key = CacheKey { etag, @@ -207,10 +202,7 @@ mod tests { let expected = vec![ "mountpoint-cache", CACHE_VERSION, - "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\ - YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\ - YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWF", - "hYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWE", + "a7bf334bec6f17021671033b243b8689757212496cd525ba9873addde87b0c56", key.etag.as_str(), ]; let path = data_cache.get_path_for_key(&key); From a62c87bf5b085418ce9018058235c419cddfdc95 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 2 Nov 2023 17:42:50 +0000 Subject: [PATCH 13/25] Add TODO to split directories into sub-directories to avoid hitting any FS-specific max number of dir entries Signed-off-by: Daniel Carl Jones --- mountpoint-s3/src/data_cache/disk_data_cache.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 98fb1d8f6..d9717ef5f 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -89,6 +89,8 @@ impl DiskDataCache { // Instead, we hash the key. // The risk of collisions is mitigated as we will ignore blocks read that contain the wrong S3 key, etc.. let encoded_s3_key = hex::encode(Sha256::digest(cache_key.s3_key.as_bytes())); + // TODO: Split directory into subdirectories. + // Take the first few chars of hash to avoid hitting any FS-specific maximum number of directory entries. path.push(encoded_s3_key); path.push(cache_key.etag.as_str()); path From 6f55978b2c573da9c6a65fed7e7b8348d2fe5a31 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 2 Nov 2023 17:54:05 +0000 Subject: [PATCH 14/25] Remove intermediate buffers when (de)serializing DataBlock with bincode Signed-off-by: Daniel Carl Jones --- .../src/data_cache/disk_data_cache.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index d9717ef5f..ea2f60edb 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -5,7 +5,7 @@ use std::io::{ErrorKind, Write}; use std::ops::RangeBounds; use std::path::PathBuf; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::Bytes; use mountpoint_s3_crt::checksums::crc32c::Crc32c; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; @@ -113,12 +113,7 @@ impl DataCache for DiskDataCache { Err(err) => return Err(err.into()), }; - let bytes = BytesMut::with_capacity(self.block_size as usize); // TODO: fix capacity? - let mut writer = bytes.writer(); - std::io::copy(&mut file, &mut writer)?; - let encoded = writer.into_inner().freeze(); - - let block: DataBlock = match bincode::deserialize(&encoded[..]) { + let block: DataBlock = match bincode::deserialize_from(&file) { Ok(block) => block, Err(e) => { error!("block could not be deserialized: {:?}", e); @@ -138,9 +133,13 @@ impl DataCache for DiskDataCache { let block = DataBlock::new(cache_key, block_idx, bytes)?; let mut file = fs::File::create(path)?; - let encoded: Vec = bincode::serialize(&block).expect("todo: why do i expect this to work?"); - file.write_all(&encoded)?; - file.sync_data()?; + let serialize_result = bincode::serialize_into(&mut file, &block); + if let Err(err) = serialize_result { + return match *err { + bincode::ErrorKind::Io(io_err) => return Err(DataCacheError::from(io_err)), + _ => Err(DataCacheError::InvalidBlockContent), + }; + }; Ok(()) } From ec5814c71318a1f07e3afd7d1808a9c6a790b2f0 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 2 Nov 2023 18:12:02 +0000 Subject: [PATCH 15/25] Add cache version identifer to the start of blocks written to disk Signed-off-by: Daniel Carl Jones --- mountpoint-s3/src/data_cache/disk_data_cache.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index ea2f60edb..3321102d4 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -1,7 +1,7 @@ //! Module for the on-disk data cache implementation. use std::fs; -use std::io::{ErrorKind, Write}; +use std::io::{ErrorKind, Read, Write}; use std::ops::RangeBounds; use std::path::PathBuf; @@ -107,12 +107,19 @@ impl DiskDataCache { impl DataCache for DiskDataCache { fn get_block(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult> { let path = self.get_path_for_block(cache_key, block_idx); - let mut file = match fs::File::open(path) { + let mut file = match fs::File::open(&path) { Ok(file) => file, Err(err) if err.kind() == ErrorKind::NotFound => return Ok(None), Err(err) => return Err(err.into()), }; + let mut block_version = [0; CACHE_VERSION.len()]; + file.read_exact(&mut block_version)?; + if block_version != CACHE_VERSION.as_bytes() { + error!(found_version = ?block_version, ?path, "stale block format found during reading"); + return Err(DataCacheError::InvalidBlockContent); + } + let block: DataBlock = match bincode::deserialize_from(&file) { Ok(block) => block, Err(e) => { @@ -133,6 +140,7 @@ impl DataCache for DiskDataCache { let block = DataBlock::new(cache_key, block_idx, bytes)?; let mut file = fs::File::create(path)?; + file.write_all(CACHE_VERSION.as_bytes())?; let serialize_result = bincode::serialize_into(&mut file, &block); if let Err(err) = serialize_result { return match *err { From 3d9e8b1ffb7c7e25689c78037f92002f163ea365 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 2 Nov 2023 19:01:26 +0000 Subject: [PATCH 16/25] Fix comment on ETag::into_inner Signed-off-by: Daniel Carl Jones --- mountpoint-s3-client/src/object_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mountpoint-s3-client/src/object_client.rs b/mountpoint-s3-client/src/object_client.rs index 5e58aef7b..ea2e87018 100644 --- a/mountpoint-s3-client/src/object_client.rs +++ b/mountpoint-s3-client/src/object_client.rs @@ -31,7 +31,7 @@ impl ETag { &self.etag } - /// Unpack the [String] contained by the enum. + /// Unpack the [String] contained by the [ETag] wrapper pub fn into_inner(self) -> String { self.etag } From f5b134a50f1e62053ee89da9862a15408eed0e4f Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 2 Nov 2023 19:04:33 +0000 Subject: [PATCH 17/25] Add rustdoc to DataBlock::new Signed-off-by: Daniel Carl Jones --- mountpoint-s3/src/data_cache/disk_data_cache.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 3321102d4..3e08c9748 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -37,6 +37,10 @@ pub struct DataBlock { } impl DataBlock { + /// Create a new [DataBlock]. + /// + /// This may return an integrity error if the checksummed byte buffer is found to be corrupt. + /// However, this check is not guaranteed and it shouldn't be assumed that the data within the block is not corrupt. fn new(cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult { let (s3_key, etag) = (cache_key.s3_key, cache_key.etag.into_inner()); let (data, checksum) = bytes.into_inner().map_err(|_e| DataCacheError::InvalidBlockContent)?; From 5debfc5788dd3586026a35a2da070cd92a070df3 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 2 Nov 2023 19:05:11 +0000 Subject: [PATCH 18/25] Fix typo in rustdoc for DataBlock::data Signed-off-by: Daniel Carl Jones --- mountpoint-s3/src/data_cache/disk_data_cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 3e08c9748..f960c686c 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -54,7 +54,7 @@ impl DataBlock { }) } - /// Extra the block data, checking that fields such as S3 key, etc. match what we expect. + /// Extract the block data, checking that fields such as S3 key, etc. match what we expect. /// /// Comparing these fields helps ensure we have not corrupted or swapped block data on disk. fn data(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult { From 56efa0c53fbded35a1aefab6914b32415449cce6 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Thu, 2 Nov 2023 19:07:15 +0000 Subject: [PATCH 19/25] Add expected version to data block read error message Signed-off-by: Daniel Carl Jones --- mountpoint-s3/src/data_cache/disk_data_cache.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index f960c686c..20218ffe5 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -120,7 +120,10 @@ impl DataCache for DiskDataCache { let mut block_version = [0; CACHE_VERSION.len()]; file.read_exact(&mut block_version)?; if block_version != CACHE_VERSION.as_bytes() { - error!(found_version = ?block_version, ?path, "stale block format found during reading"); + error!( + found_version = ?block_version, expected_version = ?CACHE_VERSION, ?path, + "stale block format found during reading" + ); return Err(DataCacheError::InvalidBlockContent); } From a429d393f34cc354e2a8ebbbdd8ecee99f8363dc Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Fri, 3 Nov 2023 08:34:34 +0000 Subject: [PATCH 20/25] Split DataBlock header fields into BlockHeader Signed-off-by: Daniel Carl Jones --- .../src/data_cache/disk_data_cache.rs | 103 +++++++++++++----- 1 file changed, 77 insertions(+), 26 deletions(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 20218ffe5..fc2ae4f75 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -24,18 +24,79 @@ pub struct DiskDataCache { cache_directory: PathBuf, } -/// Represents a fixed-size chunk of data that can be serialized. +/// Describes additional information about the data stored in the block. +/// +/// It should be written alongside the block's data +/// and used to verify it contains the correct contents to avoid blocks being mixed up. /// -/// TODO: Add checksum over struct (excl. `data`) to verify block metadata later. +/// TODO: Add checksum over struct to verify block metadata later. #[derive(Serialize, Deserialize, Debug)] -pub struct DataBlock { +struct BlockHeader { block_idx: BlockIndex, - checksum: u32, - data: Bytes, etag: String, s3_key: String, } +enum BlockHeaderError { + /// The header's fields or its checksum is corrupt + IntegrityError, + /// One or more of the fields in this header were incorrect + FieldMismatchError, +} + +impl From for DataCacheError { + fn from(value: BlockHeaderError) -> Self { + match value { + BlockHeaderError::IntegrityError | BlockHeaderError::FieldMismatchError => { + DataCacheError::InvalidBlockContent + } + } + } +} + +impl BlockHeader { + /// Creates a new [BlockHeader] + fn new(block_idx: BlockIndex, etag: String, s3_key: String) -> Self { + BlockHeader { + block_idx, + etag, + s3_key, + } + } + + /// Validate the integrity of the contained data. + /// + /// Execute this method before acting on the data contained within. + /// + /// TODO: Validate a checksum associated with the header content. + fn validate(&self, s3_key: &str, etag: &str, block_idx: BlockIndex) -> Result<(), BlockHeaderError> { + let s3_key_match = s3_key == self.s3_key; + let etag_match = etag == self.etag; + let block_idx_match = block_idx == self.block_idx; + + if s3_key_match && etag_match && block_idx_match { + Ok(()) + } else { + error!( + s3_key_match, + etag_match, block_idx_match, "block data did not match expected values", + ); + Err(BlockHeaderError::FieldMismatchError) + } + } +} + +/// Represents a fixed-size chunk of data that can be serialized. +#[derive(Serialize, Deserialize, Debug)] +struct DataBlock { + /// Information describing the content of `data`, to be used to verify correctness + header: BlockHeader, + /// Cached bytes + data: Bytes, + /// Checksum over the bytes in `data` + data_checksum: u32, +} + impl DataBlock { /// Create a new [DataBlock]. /// @@ -43,14 +104,15 @@ impl DataBlock { /// However, this check is not guaranteed and it shouldn't be assumed that the data within the block is not corrupt. fn new(cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult { let (s3_key, etag) = (cache_key.s3_key, cache_key.etag.into_inner()); - let (data, checksum) = bytes.into_inner().map_err(|_e| DataCacheError::InvalidBlockContent)?; - let checksum = checksum.value(); + let header = BlockHeader::new(block_idx, etag, s3_key); + + let (data, data_checksum) = bytes.into_inner().map_err(|_e| DataCacheError::InvalidBlockContent)?; + let data_checksum = data_checksum.value(); + Ok(DataBlock { - block_idx, - checksum, data, - etag, - s3_key, + data_checksum, + header, }) } @@ -58,21 +120,10 @@ impl DataBlock { /// /// Comparing these fields helps ensure we have not corrupted or swapped block data on disk. fn data(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult { - let (s3_key, etag) = (cache_key.s3_key.as_str(), &cache_key.etag); - - let s3_key_match = s3_key == self.s3_key; - let etag_match = etag.as_str() == self.etag; - let block_idx_match = block_idx == self.block_idx; - if s3_key_match && etag_match && block_idx_match { - let bytes = ChecksummedBytes::new(self.data.clone(), Crc32c::new(self.checksum)); - Ok(bytes) - } else { - error!( - s3_key_match, - etag_match, block_idx_match, "block data did not match expected values", - ); - Err(DataCacheError::InvalidBlockContent) - } + self.header + .validate(cache_key.s3_key.as_str(), cache_key.etag.as_str(), block_idx)?; + let bytes = ChecksummedBytes::new(self.data.clone(), Crc32c::new(self.data_checksum)); + Ok(bytes) } } From b05936d4ff7764cf1ba10c170316aea015db7abf Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Fri, 3 Nov 2023 08:58:39 +0000 Subject: [PATCH 21/25] Add checksum validation on on-disk cache DataBlock header contents Signed-off-by: Daniel Carl Jones --- .../src/data_cache/disk_data_cache.rs | 59 +++++++++++++++++-- 1 file changed, 53 insertions(+), 6 deletions(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index fc2ae4f75..336f1dcde 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -6,7 +6,7 @@ use std::ops::RangeBounds; use std::path::PathBuf; use bytes::Bytes; -use mountpoint_s3_crt::checksums::crc32c::Crc32c; +use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use tracing::{error, trace}; @@ -35,8 +35,10 @@ struct BlockHeader { block_idx: BlockIndex, etag: String, s3_key: String, + header_checksum: u32, } +#[derive(Debug)] enum BlockHeaderError { /// The header's fields or its checksum is corrupt IntegrityError, @@ -56,26 +58,38 @@ impl From for DataCacheError { impl BlockHeader { /// Creates a new [BlockHeader] - fn new(block_idx: BlockIndex, etag: String, s3_key: String) -> Self { + pub fn new(block_idx: BlockIndex, etag: String, s3_key: String) -> Self { + let header_checksum = Self::compute_checksum(block_idx, &etag, &s3_key).value(); BlockHeader { block_idx, etag, s3_key, + header_checksum, } } + fn compute_checksum(block_idx: BlockIndex, etag: &str, s3_key: &str) -> Crc32c { + let mut hasher = crc32c::Hasher::new(); + hasher.update(block_idx.to_be_bytes().as_ref()); + hasher.update(etag.as_bytes()); + hasher.update(s3_key.as_bytes()); + hasher.finalize() + } + /// Validate the integrity of the contained data. /// /// Execute this method before acting on the data contained within. - /// - /// TODO: Validate a checksum associated with the header content. - fn validate(&self, s3_key: &str, etag: &str, block_idx: BlockIndex) -> Result<(), BlockHeaderError> { + pub fn validate(&self, s3_key: &str, etag: &str, block_idx: BlockIndex) -> Result<(), BlockHeaderError> { let s3_key_match = s3_key == self.s3_key; let etag_match = etag == self.etag; let block_idx_match = block_idx == self.block_idx; if s3_key_match && etag_match && block_idx_match { - Ok(()) + if Self::compute_checksum(block_idx, etag, s3_key).value() != self.header_checksum { + Err(BlockHeaderError::IntegrityError) + } else { + Ok(()) + } } else { error!( s3_key_match, @@ -381,4 +395,37 @@ mod tests { let unpacked_bytes = block.data(&cache_key_1, 0).expect("should be OK as all fields match"); assert_eq!(data_1, unpacked_bytes, "data block should return original bytes"); } + + #[test] + fn validate_block_header() { + let block_idx = 0; + let etag = ETag::for_tests(); + let s3_key = String::from("s3/key"); + let mut header = BlockHeader::new(block_idx, etag.as_str().to_owned(), s3_key.clone()); + + header + .validate(&s3_key, etag.as_str(), block_idx) + .expect("should be OK with valid fields and checksum"); + + // Bad fields + let err = header + .validate("hello", etag.as_str(), block_idx) + .expect_err("should fail with invalid s3_key"); + assert!(matches!(err, BlockHeaderError::FieldMismatchError)); + let err = header + .validate(&s3_key, "bad etag", block_idx) + .expect_err("should fail with invalid etag"); + assert!(matches!(err, BlockHeaderError::FieldMismatchError)); + let err = header + .validate(&s3_key, etag.as_str(), 5) + .expect_err("should fail with invalid block idx"); + assert!(matches!(err, BlockHeaderError::FieldMismatchError)); + + // Bad checksum + header.header_checksum = 23; + let err = header + .validate(&s3_key, etag.as_str(), block_idx) + .expect_err("should fail with invalid checksum"); + assert!(matches!(err, BlockHeaderError::IntegrityError)); + } } From 6dd6b16d92085c6979a71ca2413ed46715a3c6e0 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Fri, 3 Nov 2023 10:38:47 +0000 Subject: [PATCH 22/25] Remove outdated TODO Signed-off-by: Daniel Carl Jones --- mountpoint-s3/src/data_cache/disk_data_cache.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index 336f1dcde..cc1a950c1 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -28,8 +28,6 @@ pub struct DiskDataCache { /// /// It should be written alongside the block's data /// and used to verify it contains the correct contents to avoid blocks being mixed up. -/// -/// TODO: Add checksum over struct to verify block metadata later. #[derive(Serialize, Deserialize, Debug)] struct BlockHeader { block_idx: BlockIndex, From 4a49d2cb1cbd31f5c1c1d0698fde1678bd2abb4c Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Fri, 3 Nov 2023 11:40:15 +0000 Subject: [PATCH 23/25] Add test for detecting when DataBlock requires version bump Signed-off-by: Daniel Carl Jones --- .../src/data_cache/disk_data_cache.rs | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index cc1a950c1..e89cdd03c 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -244,6 +244,26 @@ mod tests { use mountpoint_s3_client::types::ETag; use test_case::test_case; + #[test] + fn test_block_format_version_requires_update() { + let data = ChecksummedBytes::from_bytes("Foo".into()); + let cache_key = CacheKey { + etag: ETag::for_tests(), + s3_key: String::from("hello-world"), + }; + let block = DataBlock::new(cache_key, 100, data).expect("should success as data checksum is valid"); + let expected_bytes: Vec = vec![ + 100, 0, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116, 101, 115, 116, 95, 101, 116, 97, 103, 11, 0, 0, 0, + 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114, 108, 100, 144, 51, 75, 183, 3, 0, 0, 0, 0, 0, 0, 0, + 70, 111, 111, 9, 85, 128, 46, + ]; + let serialized_bytes = bincode::serialize(&block).unwrap(); + assert_eq!( + expected_bytes, serialized_bytes, + "serialzed disk format appears to have changed, version bump required" + ); + } + #[test_case("hello"; "simple string")] #[test_case("foo/bar/baz"; "with forward slashes")] #[test_case("hello+world"; "with plus char")] From 6f1f17cd75a811dae95d0a8083f27a68395f0eaf Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Fri, 3 Nov 2023 11:48:01 +0000 Subject: [PATCH 24/25] Refactor errors for DataBlock Signed-off-by: Daniel Carl Jones --- .../src/data_cache/disk_data_cache.rs | 64 +++++++++++-------- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index e89cdd03c..fb83b6e2f 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -9,8 +9,10 @@ use bytes::Bytes; use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; +use thiserror::Error; use tracing::{error, trace}; +use crate::checksums::IntegrityError; use crate::data_cache::DataCacheError; use super::{BlockIndex, CacheKey, ChecksummedBytes, DataCache, DataCacheResult}; @@ -36,22 +38,21 @@ struct BlockHeader { header_checksum: u32, } -#[derive(Debug)] -enum BlockHeaderError { - /// The header's fields or its checksum is corrupt - IntegrityError, - /// One or more of the fields in this header were incorrect - FieldMismatchError, +/// Error during creation of a [DataBlock] +#[derive(Debug, Error)] +enum DataBlockCreationError { + /// Data corruption detected when unpacking bytes and checksum + #[error(transparent)] + IntegrityError(#[from] IntegrityError), } -impl From for DataCacheError { - fn from(value: BlockHeaderError) -> Self { - match value { - BlockHeaderError::IntegrityError | BlockHeaderError::FieldMismatchError => { - DataCacheError::InvalidBlockContent - } - } - } +/// Error during access to a [DataBlock] +#[derive(Debug, Error)] +enum DataBlockAccessError { + #[error("checksum over the block's fields did not match the field content")] + ChecksumError, + #[error("one or more of the fields in this block were incorrect")] + FieldMismatchError, } impl BlockHeader { @@ -77,14 +78,14 @@ impl BlockHeader { /// Validate the integrity of the contained data. /// /// Execute this method before acting on the data contained within. - pub fn validate(&self, s3_key: &str, etag: &str, block_idx: BlockIndex) -> Result<(), BlockHeaderError> { + pub fn validate(&self, s3_key: &str, etag: &str, block_idx: BlockIndex) -> Result<(), DataBlockAccessError> { let s3_key_match = s3_key == self.s3_key; let etag_match = etag == self.etag; let block_idx_match = block_idx == self.block_idx; if s3_key_match && etag_match && block_idx_match { if Self::compute_checksum(block_idx, etag, s3_key).value() != self.header_checksum { - Err(BlockHeaderError::IntegrityError) + Err(DataBlockAccessError::ChecksumError) } else { Ok(()) } @@ -93,7 +94,7 @@ impl BlockHeader { s3_key_match, etag_match, block_idx_match, "block data did not match expected values", ); - Err(BlockHeaderError::FieldMismatchError) + Err(DataBlockAccessError::FieldMismatchError) } } } @@ -114,11 +115,15 @@ impl DataBlock { /// /// This may return an integrity error if the checksummed byte buffer is found to be corrupt. /// However, this check is not guaranteed and it shouldn't be assumed that the data within the block is not corrupt. - fn new(cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult { + fn new( + cache_key: CacheKey, + block_idx: BlockIndex, + bytes: ChecksummedBytes, + ) -> Result { let (s3_key, etag) = (cache_key.s3_key, cache_key.etag.into_inner()); let header = BlockHeader::new(block_idx, etag, s3_key); - let (data, data_checksum) = bytes.into_inner().map_err(|_e| DataCacheError::InvalidBlockContent)?; + let (data, data_checksum) = bytes.into_inner()?; let data_checksum = data_checksum.value(); Ok(DataBlock { @@ -131,7 +136,7 @@ impl DataBlock { /// Extract the block data, checking that fields such as S3 key, etc. match what we expect. /// /// Comparing these fields helps ensure we have not corrupted or swapped block data on disk. - fn data(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult { + fn data(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> Result { self.header .validate(cache_key.s3_key.as_str(), cache_key.etag.as_str(), block_idx)?; let bytes = ChecksummedBytes::new(self.data.clone(), Crc32c::new(self.data_checksum)); @@ -197,7 +202,12 @@ impl DataCache for DiskDataCache { return Err(DataCacheError::InvalidBlockContent); } }; - let bytes = block.data(cache_key, block_idx)?; + let bytes = block.data(cache_key, block_idx).map_err(|err| match err { + DataBlockAccessError::ChecksumError | DataBlockAccessError::FieldMismatchError => { + DataCacheError::InvalidBlockContent + } + })?; + Ok(Some(bytes)) } @@ -207,7 +217,9 @@ impl DataCache for DiskDataCache { let cache_path_for_key = path.parent().expect("path should include cache key in directory name"); fs::create_dir_all(cache_path_for_key)?; - let block = DataBlock::new(cache_key, block_idx, bytes)?; + let block = DataBlock::new(cache_key, block_idx, bytes).map_err(|err| match err { + DataBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent, + })?; let mut file = fs::File::create(path)?; file.write_all(CACHE_VERSION.as_bytes())?; @@ -429,21 +441,21 @@ mod tests { let err = header .validate("hello", etag.as_str(), block_idx) .expect_err("should fail with invalid s3_key"); - assert!(matches!(err, BlockHeaderError::FieldMismatchError)); + assert!(matches!(err, DataBlockAccessError::FieldMismatchError)); let err = header .validate(&s3_key, "bad etag", block_idx) .expect_err("should fail with invalid etag"); - assert!(matches!(err, BlockHeaderError::FieldMismatchError)); + assert!(matches!(err, DataBlockAccessError::FieldMismatchError)); let err = header .validate(&s3_key, etag.as_str(), 5) .expect_err("should fail with invalid block idx"); - assert!(matches!(err, BlockHeaderError::FieldMismatchError)); + assert!(matches!(err, DataBlockAccessError::FieldMismatchError)); // Bad checksum header.header_checksum = 23; let err = header .validate(&s3_key, etag.as_str(), block_idx) .expect_err("should fail with invalid checksum"); - assert!(matches!(err, BlockHeaderError::IntegrityError)); + assert!(matches!(err, DataBlockAccessError::ChecksumError)); } } From 86841a436c7a2ce4d621c7869811218b64938104 Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Fri, 3 Nov 2023 11:53:42 +0000 Subject: [PATCH 25/25] Rename DataBlock to DiskBlock Signed-off-by: Daniel Carl Jones --- .../src/data_cache/disk_data_cache.rs | 62 +++++++++---------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index fb83b6e2f..864db62e1 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -31,35 +31,35 @@ pub struct DiskDataCache { /// It should be written alongside the block's data /// and used to verify it contains the correct contents to avoid blocks being mixed up. #[derive(Serialize, Deserialize, Debug)] -struct BlockHeader { +struct DiskBlockHeader { block_idx: BlockIndex, etag: String, s3_key: String, header_checksum: u32, } -/// Error during creation of a [DataBlock] +/// Error during creation of a [DiskBlock] #[derive(Debug, Error)] -enum DataBlockCreationError { +enum DiskBlockCreationError { /// Data corruption detected when unpacking bytes and checksum #[error(transparent)] IntegrityError(#[from] IntegrityError), } -/// Error during access to a [DataBlock] +/// Error during access to a [DiskBlock] #[derive(Debug, Error)] -enum DataBlockAccessError { +enum DiskBlockAccessError { #[error("checksum over the block's fields did not match the field content")] ChecksumError, #[error("one or more of the fields in this block were incorrect")] FieldMismatchError, } -impl BlockHeader { - /// Creates a new [BlockHeader] +impl DiskBlockHeader { + /// Creates a new [DiskBlockHeader] pub fn new(block_idx: BlockIndex, etag: String, s3_key: String) -> Self { let header_checksum = Self::compute_checksum(block_idx, &etag, &s3_key).value(); - BlockHeader { + DiskBlockHeader { block_idx, etag, s3_key, @@ -78,14 +78,14 @@ impl BlockHeader { /// Validate the integrity of the contained data. /// /// Execute this method before acting on the data contained within. - pub fn validate(&self, s3_key: &str, etag: &str, block_idx: BlockIndex) -> Result<(), DataBlockAccessError> { + pub fn validate(&self, s3_key: &str, etag: &str, block_idx: BlockIndex) -> Result<(), DiskBlockAccessError> { let s3_key_match = s3_key == self.s3_key; let etag_match = etag == self.etag; let block_idx_match = block_idx == self.block_idx; if s3_key_match && etag_match && block_idx_match { if Self::compute_checksum(block_idx, etag, s3_key).value() != self.header_checksum { - Err(DataBlockAccessError::ChecksumError) + Err(DiskBlockAccessError::ChecksumError) } else { Ok(()) } @@ -94,24 +94,24 @@ impl BlockHeader { s3_key_match, etag_match, block_idx_match, "block data did not match expected values", ); - Err(DataBlockAccessError::FieldMismatchError) + Err(DiskBlockAccessError::FieldMismatchError) } } } /// Represents a fixed-size chunk of data that can be serialized. #[derive(Serialize, Deserialize, Debug)] -struct DataBlock { +struct DiskBlock { /// Information describing the content of `data`, to be used to verify correctness - header: BlockHeader, + header: DiskBlockHeader, /// Cached bytes data: Bytes, /// Checksum over the bytes in `data` data_checksum: u32, } -impl DataBlock { - /// Create a new [DataBlock]. +impl DiskBlock { + /// Create a new [DiskBlock]. /// /// This may return an integrity error if the checksummed byte buffer is found to be corrupt. /// However, this check is not guaranteed and it shouldn't be assumed that the data within the block is not corrupt. @@ -119,14 +119,14 @@ impl DataBlock { cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes, - ) -> Result { + ) -> Result { let (s3_key, etag) = (cache_key.s3_key, cache_key.etag.into_inner()); - let header = BlockHeader::new(block_idx, etag, s3_key); + let header = DiskBlockHeader::new(block_idx, etag, s3_key); let (data, data_checksum) = bytes.into_inner()?; let data_checksum = data_checksum.value(); - Ok(DataBlock { + Ok(DiskBlock { data, data_checksum, header, @@ -136,7 +136,7 @@ impl DataBlock { /// Extract the block data, checking that fields such as S3 key, etc. match what we expect. /// /// Comparing these fields helps ensure we have not corrupted or swapped block data on disk. - fn data(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> Result { + fn data(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> Result { self.header .validate(cache_key.s3_key.as_str(), cache_key.etag.as_str(), block_idx)?; let bytes = ChecksummedBytes::new(self.data.clone(), Crc32c::new(self.data_checksum)); @@ -195,7 +195,7 @@ impl DataCache for DiskDataCache { return Err(DataCacheError::InvalidBlockContent); } - let block: DataBlock = match bincode::deserialize_from(&file) { + let block: DiskBlock = match bincode::deserialize_from(&file) { Ok(block) => block, Err(e) => { error!("block could not be deserialized: {:?}", e); @@ -203,7 +203,7 @@ impl DataCache for DiskDataCache { } }; let bytes = block.data(cache_key, block_idx).map_err(|err| match err { - DataBlockAccessError::ChecksumError | DataBlockAccessError::FieldMismatchError => { + DiskBlockAccessError::ChecksumError | DiskBlockAccessError::FieldMismatchError => { DataCacheError::InvalidBlockContent } })?; @@ -213,12 +213,12 @@ impl DataCache for DiskDataCache { fn put_block(&self, cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()> { let path = self.get_path_for_block(&cache_key, block_idx); - trace!(?cache_key, ?path, "new block will be created in data cache"); + trace!(?cache_key, ?path, "new block will be created in disk cache"); let cache_path_for_key = path.parent().expect("path should include cache key in directory name"); fs::create_dir_all(cache_path_for_key)?; - let block = DataBlock::new(cache_key, block_idx, bytes).map_err(|err| match err { - DataBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent, + let block = DiskBlock::new(cache_key, block_idx, bytes).map_err(|err| match err { + DiskBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent, })?; let mut file = fs::File::create(path)?; @@ -263,7 +263,7 @@ mod tests { etag: ETag::for_tests(), s3_key: String::from("hello-world"), }; - let block = DataBlock::new(cache_key, 100, data).expect("should success as data checksum is valid"); + let block = DiskBlock::new(cache_key, 100, data).expect("should success as data checksum is valid"); let expected_bytes: Vec = vec![ 100, 0, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116, 101, 115, 116, 95, 101, 116, 97, 103, 11, 0, 0, 0, 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114, 108, 100, 144, 51, 75, 183, 3, 0, 0, 0, 0, 0, 0, 0, @@ -412,7 +412,7 @@ mod tests { etag: ETag::from_str("badetag").unwrap(), }; - let block = DataBlock::new(cache_key_1.clone(), 0, data_1.clone()).expect("should have no checksum err"); + let block = DiskBlock::new(cache_key_1.clone(), 0, data_1.clone()).expect("should have no checksum err"); block .data(&cache_key_1, 1) .expect_err("should fail due to incorrect block index"); @@ -431,7 +431,7 @@ mod tests { let block_idx = 0; let etag = ETag::for_tests(); let s3_key = String::from("s3/key"); - let mut header = BlockHeader::new(block_idx, etag.as_str().to_owned(), s3_key.clone()); + let mut header = DiskBlockHeader::new(block_idx, etag.as_str().to_owned(), s3_key.clone()); header .validate(&s3_key, etag.as_str(), block_idx) @@ -441,21 +441,21 @@ mod tests { let err = header .validate("hello", etag.as_str(), block_idx) .expect_err("should fail with invalid s3_key"); - assert!(matches!(err, DataBlockAccessError::FieldMismatchError)); + assert!(matches!(err, DiskBlockAccessError::FieldMismatchError)); let err = header .validate(&s3_key, "bad etag", block_idx) .expect_err("should fail with invalid etag"); - assert!(matches!(err, DataBlockAccessError::FieldMismatchError)); + assert!(matches!(err, DiskBlockAccessError::FieldMismatchError)); let err = header .validate(&s3_key, etag.as_str(), 5) .expect_err("should fail with invalid block idx"); - assert!(matches!(err, DataBlockAccessError::FieldMismatchError)); + assert!(matches!(err, DiskBlockAccessError::FieldMismatchError)); // Bad checksum header.header_checksum = 23; let err = header .validate(&s3_key, etag.as_str(), block_idx) .expect_err("should fail with invalid checksum"); - assert!(matches!(err, DataBlockAccessError::ChecksumError)); + assert!(matches!(err, DiskBlockAccessError::ChecksumError)); } }