diff --git a/mountpoint-s3/src/data_cache.rs b/mountpoint-s3/src/data_cache.rs
index 9ffc2e56f..1ff408d88 100644
--- a/mountpoint-s3/src/data_cache.rs
+++ b/mountpoint-s3/src/data_cache.rs
@@ -31,6 +31,8 @@ pub enum DataCacheError {
IoFailure(#[from] std::io::Error),
#[error("Block content was not valid/readable")]
InvalidBlockContent,
+ #[error("Block offset does not match block index")]
+ InvalidBlockOffset,
#[error("Error while trying to evict cache content")]
EvictionFailure,
}
@@ -45,10 +47,21 @@ pub trait DataCache {
/// Get block of data from the cache for the given [CacheKey] and [BlockIndex], if available.
///
/// Operation may fail due to errors, or return [None] if the block was not available in the cache.
- fn get_block(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult>;
+ fn get_block(
+ &self,
+ cache_key: &CacheKey,
+ block_idx: BlockIndex,
+ block_offset: u64,
+ ) -> DataCacheResult >;
/// Put block of data to the cache for the given [CacheKey] and [BlockIndex].
- fn put_block(&self, cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()>;
+ fn put_block(
+ &self,
+ cache_key: CacheKey,
+ block_idx: BlockIndex,
+ block_offset: u64,
+ bytes: ChecksummedBytes,
+ ) -> DataCacheResult<()>;
/// Returns the block size for the data cache.
fn block_size(&self) -> u64;
diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs
index d100ce8f7..e7d37a7f2 100644
--- a/mountpoint-s3/src/data_cache/disk_data_cache.rs
+++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs
@@ -48,8 +48,10 @@ pub enum CacheLimit {
#[derive(Serialize, Deserialize, Debug)]
struct DiskBlockHeader {
block_idx: BlockIndex,
+ block_offset: u64,
etag: String,
s3_key: String,
+ data_checksum: u32,
header_checksum: u32,
}
@@ -72,37 +74,58 @@ enum DiskBlockAccessError {
impl DiskBlockHeader {
/// Creates a new [DiskBlockHeader]
- pub fn new(block_idx: BlockIndex, etag: String, s3_key: String) -> Self {
- let header_checksum = Self::compute_checksum(block_idx, &etag, &s3_key).value();
+ pub fn new(block_idx: BlockIndex, block_offset: u64, etag: String, s3_key: String, data_checksum: Crc32c) -> Self {
+ let data_checksum = data_checksum.value();
+ let header_checksum = Self::compute_checksum(block_idx, block_offset, &etag, &s3_key, data_checksum).value();
DiskBlockHeader {
block_idx,
+ block_offset,
etag,
s3_key,
+ data_checksum,
header_checksum,
}
}
- fn compute_checksum(block_idx: BlockIndex, etag: &str, s3_key: &str) -> Crc32c {
+ fn compute_checksum(
+ block_idx: BlockIndex,
+ block_offset: u64,
+ etag: &str,
+ s3_key: &str,
+ data_checksum: u32,
+ ) -> Crc32c {
let mut hasher = crc32c::Hasher::new();
- hasher.update(block_idx.to_be_bytes().as_ref());
+ hasher.update(&block_idx.to_be_bytes());
+ hasher.update(&block_offset.to_be_bytes());
hasher.update(etag.as_bytes());
hasher.update(s3_key.as_bytes());
+ hasher.update(&data_checksum.to_be_bytes());
hasher.finalize()
}
- /// Validate the integrity of the contained data.
+ /// Validate the integrity of the contained data and return the stored data checksum.
///
/// Execute this method before acting on the data contained within.
- pub fn validate(&self, s3_key: &str, etag: &str, block_idx: BlockIndex) -> Result<(), DiskBlockAccessError> {
+ pub fn validate(
+ &self,
+ s3_key: &str,
+ etag: &str,
+ block_idx: BlockIndex,
+ block_offset: u64,
+ ) -> Result {
let s3_key_match = s3_key == self.s3_key;
let etag_match = etag == self.etag;
let block_idx_match = block_idx == self.block_idx;
+ let block_offset_match = block_offset == self.block_offset;
- if s3_key_match && etag_match && block_idx_match {
- if Self::compute_checksum(block_idx, etag, s3_key).value() != self.header_checksum {
+ let data_checksum = self.data_checksum;
+ if s3_key_match && etag_match && block_idx_match && block_offset_match {
+ if Self::compute_checksum(block_idx, block_offset, etag, s3_key, data_checksum).value()
+ != self.header_checksum
+ {
Err(DiskBlockAccessError::ChecksumError)
} else {
- Ok(())
+ Ok(Crc32c::new(data_checksum))
}
} else {
error!(
@@ -121,8 +144,6 @@ struct DiskBlock {
header: DiskBlockHeader,
/// Cached bytes
data: Bytes,
- /// Checksum over the bytes in `data`
- data_checksum: u32,
}
impl DiskBlock {
@@ -133,28 +154,32 @@ impl DiskBlock {
fn new(
cache_key: CacheKey,
block_idx: BlockIndex,
+ block_offset: u64,
bytes: ChecksummedBytes,
) -> Result {
let (s3_key, etag) = (cache_key.s3_key, cache_key.etag.into_inner());
- let header = DiskBlockHeader::new(block_idx, etag, s3_key);
-
let (data, data_checksum) = bytes.into_inner()?;
- let data_checksum = data_checksum.value();
+ let header = DiskBlockHeader::new(block_idx, block_offset, etag, s3_key, data_checksum);
- Ok(DiskBlock {
- data,
- data_checksum,
- header,
- })
+ Ok(DiskBlock { data, header })
}
/// Extract the block data, checking that fields such as S3 key, etc. match what we expect.
///
/// Comparing these fields helps ensure we have not corrupted or swapped block data on disk.
- fn data(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> Result {
- self.header
- .validate(cache_key.s3_key.as_str(), cache_key.etag.as_str(), block_idx)?;
- let bytes = ChecksummedBytes::new(self.data.clone(), Crc32c::new(self.data_checksum));
+ fn data(
+ &self,
+ cache_key: &CacheKey,
+ block_idx: BlockIndex,
+ block_offset: u64,
+ ) -> Result {
+ let data_checksum = self.header.validate(
+ cache_key.s3_key.as_str(),
+ cache_key.etag.as_str(),
+ block_idx,
+ block_offset,
+ )?;
+ let bytes = ChecksummedBytes::new(self.data.clone(), data_checksum);
Ok(bytes)
}
}
@@ -186,6 +211,7 @@ impl DiskDataCache {
path: impl AsRef,
cache_key: &CacheKey,
block_idx: BlockIndex,
+ block_offset: u64,
) -> DataCacheResult> {
let mut file = match fs::File::open(path.as_ref()) {
Ok(file) => file,
@@ -210,11 +236,13 @@ impl DiskDataCache {
return Err(DataCacheError::InvalidBlockContent);
}
};
- let bytes = block.data(cache_key, block_idx).map_err(|err| match err {
- DiskBlockAccessError::ChecksumError | DiskBlockAccessError::FieldMismatchError => {
- DataCacheError::InvalidBlockContent
- }
- })?;
+ let bytes = block
+ .data(cache_key, block_idx, block_offset)
+ .map_err(|err| match err {
+ DiskBlockAccessError::ChecksumError | DiskBlockAccessError::FieldMismatchError => {
+ DataCacheError::InvalidBlockContent
+ }
+ })?;
Ok(Some(bytes))
}
@@ -290,10 +318,18 @@ fn hash_cache_key_raw(cache_key: &CacheKey) -> [u8; 32] {
}
impl DataCache for DiskDataCache {
- fn get_block(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult > {
+ fn get_block(
+ &self,
+ cache_key: &CacheKey,
+ block_idx: BlockIndex,
+ block_offset: u64,
+ ) -> DataCacheResult > {
+ if block_offset != block_idx * self.block_size {
+ return Err(DataCacheError::InvalidBlockOffset);
+ }
let block_key = DiskBlockKey::new(cache_key, block_idx);
let path = self.get_path_for_block_key(&block_key);
- match self.read_block(&path, cache_key, block_idx) {
+ match self.read_block(&path, cache_key, block_idx, block_offset) {
Ok(None) => Ok(None),
Ok(Some(bytes)) => {
if let Some(usage) = &self.usage {
@@ -315,12 +351,22 @@ impl DataCache for DiskDataCache {
}
}
- fn put_block(&self, cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()> {
+ fn put_block(
+ &self,
+ cache_key: CacheKey,
+ block_idx: BlockIndex,
+ block_offset: u64,
+ bytes: ChecksummedBytes,
+ ) -> DataCacheResult<()> {
+ if block_offset != block_idx * self.block_size {
+ return Err(DataCacheError::InvalidBlockOffset);
+ }
+
let block_key = DiskBlockKey::new(&cache_key, block_idx);
let path = self.get_path_for_block_key(&block_key);
trace!(?cache_key, ?path, "new block will be created in disk cache");
- let block = DiskBlock::new(cache_key, block_idx, bytes).map_err(|err| match err {
+ let block = DiskBlock::new(cache_key, block_idx, block_offset, bytes).map_err(|err| match err {
DiskBlockCreationError::IntegrityError(_e) => DataCacheError::InvalidBlockContent,
})?;
@@ -443,11 +489,11 @@ mod tests {
etag: ETag::for_tests(),
s3_key: String::from("hello-world"),
};
- let block = DiskBlock::new(cache_key, 100, data).expect("should success as data checksum is valid");
+ let block = DiskBlock::new(cache_key, 100, 100 * 10, data).expect("should succeed as data checksum is valid");
let expected_bytes: Vec = vec![
- 100, 0, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116, 101, 115, 116, 95, 101, 116, 97, 103, 11, 0, 0, 0,
- 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114, 108, 100, 144, 51, 75, 183, 3, 0, 0, 0, 0, 0, 0, 0,
- 70, 111, 111, 9, 85, 128, 46,
+ 100, 0, 0, 0, 0, 0, 0, 0, 232, 3, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0, 0, 0, 0, 116, 101, 115, 116, 95, 101,
+ 116, 97, 103, 11, 0, 0, 0, 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114, 108, 100, 9, 85, 128,
+ 46, 29, 32, 6, 192, 3, 0, 0, 0, 0, 0, 0, 0, 70, 111, 111,
];
let serialized_bytes = bincode::serialize(&block).unwrap();
assert_eq!(
@@ -502,8 +548,9 @@ mod tests {
let data_2 = ChecksummedBytes::from_bytes("Bar".into());
let data_3 = ChecksummedBytes::from_bytes("Baz".into());
+ let block_size = 8 * 1024 * 1024;
let cache_directory = tempfile::tempdir().unwrap();
- let cache = DiskDataCache::new(cache_directory.into_path(), 8 * 1024 * 1024, CacheLimit::Unbounded);
+ let cache = DiskDataCache::new(cache_directory.into_path(), block_size, CacheLimit::Unbounded);
let cache_key_1 = CacheKey {
s3_key: "a".into(),
etag: ETag::for_tests(),
@@ -513,7 +560,7 @@ mod tests {
etag: ETag::for_tests(),
};
- let block = cache.get_block(&cache_key_1, 0).expect("cache should be accessible");
+ let block = cache.get_block(&cache_key_1, 0, 0).expect("cache should be accessible");
assert!(
block.is_none(),
"no entry should be available to return but got {:?}",
@@ -522,10 +569,10 @@ mod tests {
// PUT and GET, OK?
cache
- .put_block(cache_key_1.clone(), 0, data_1.clone())
+ .put_block(cache_key_1.clone(), 0, 0, data_1.clone())
.expect("cache should be accessible");
let entry = cache
- .get_block(&cache_key_1, 0)
+ .get_block(&cache_key_1, 0, 0)
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
@@ -535,10 +582,10 @@ mod tests {
// PUT AND GET a second file, OK?
cache
- .put_block(cache_key_2.clone(), 0, data_2.clone())
+ .put_block(cache_key_2.clone(), 0, 0, data_2.clone())
.expect("cache should be accessible");
let entry = cache
- .get_block(&cache_key_2, 0)
+ .get_block(&cache_key_2, 0, 0)
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
@@ -548,10 +595,10 @@ mod tests {
// PUT AND GET a second block in a cache entry, OK?
cache
- .put_block(cache_key_1.clone(), 1, data_3.clone())
+ .put_block(cache_key_1.clone(), 1, block_size, data_3.clone())
.expect("cache should be accessible");
let entry = cache
- .get_block(&cache_key_1, 1)
+ .get_block(&cache_key_1, 1, block_size)
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
@@ -561,7 +608,7 @@ mod tests {
// Entry 1's first block still intact
let entry = cache
- .get_block(&cache_key_1, 0)
+ .get_block(&cache_key_1, 0, 0)
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
@@ -583,10 +630,10 @@ mod tests {
};
cache
- .put_block(cache_key.clone(), 0, slice.clone())
+ .put_block(cache_key.clone(), 0, 0, slice.clone())
.expect("cache should be accessible");
let entry = cache
- .get_block(&cache_key, 0)
+ .get_block(&cache_key, 0, 0)
.expect("cache should be accessible")
.expect("cache entry should be returned");
assert_eq!(
@@ -598,6 +645,11 @@ mod tests {
#[test]
fn test_eviction() {
+ const BLOCK_SIZE: usize = 100 * 1024;
+ const LARGE_OBJECT_SIZE: usize = 1024 * 1024;
+ const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2;
+ const CACHE_LIMIT: usize = LARGE_OBJECT_SIZE;
+
fn create_random(seed: u64, size: usize) -> ChecksummedBytes {
let mut rng = ChaCha20Rng::seed_from_u64(seed);
let mut body = vec![0u8; size];
@@ -613,7 +665,7 @@ mod tests {
expected_bytes: &ChecksummedBytes,
) -> bool {
if let Some(retrieved) = cache
- .get_block(cache_key, block_idx)
+ .get_block(cache_key, block_idx, block_idx * (BLOCK_SIZE) as u64)
.expect("cache should be accessible")
{
assert_eq!(
@@ -629,11 +681,6 @@ mod tests {
}
}
- const BLOCK_SIZE: usize = 100 * 1024;
- const LARGE_OBJECT_SIZE: usize = 1024 * 1024;
- const SMALL_OBJECT_SIZE: usize = LARGE_OBJECT_SIZE / 2;
- const CACHE_LIMIT: usize = LARGE_OBJECT_SIZE;
-
let large_object = create_random(0x12345678, LARGE_OBJECT_SIZE);
let large_object_blocks: Vec<_> = (0..large_object.len())
.step_by(BLOCK_SIZE)
@@ -664,14 +711,24 @@ mod tests {
// Put all of large_object
for (block_idx, bytes) in large_object_blocks.iter().enumerate() {
cache
- .put_block(large_object_key.clone(), block_idx as u64, bytes.clone())
+ .put_block(
+ large_object_key.clone(),
+ block_idx as u64,
+ (block_idx * BLOCK_SIZE) as u64,
+ bytes.clone(),
+ )
.unwrap();
}
// Put all of small_object
for (block_idx, bytes) in small_object_blocks.iter().enumerate() {
cache
- .put_block(small_object_key.clone(), block_idx as u64, bytes.clone())
+ .put_block(
+ small_object_key.clone(),
+ block_idx as u64,
+ (block_idx * BLOCK_SIZE) as u64,
+ bytes.clone(),
+ )
.unwrap();
}
@@ -714,49 +771,67 @@ mod tests {
etag: ETag::from_str("badetag").unwrap(),
};
- let block = DiskBlock::new(cache_key_1.clone(), 0, data_1.clone()).expect("should have no checksum err");
+ let block = DiskBlock::new(cache_key_1.clone(), 0, 0, data_1.clone()).expect("should have no checksum err");
block
- .data(&cache_key_1, 1)
+ .data(&cache_key_1, 1, 0)
.expect_err("should fail due to incorrect block index");
block
- .data(&cache_key_2, 0)
+ .data(&cache_key_1, 0, 1024)
+ .expect_err("should fail due to incorrect block offset");
+ block
+ .data(&cache_key_2, 0, 0)
.expect_err("should fail due to incorrect s3 key in cache key");
block
- .data(&cache_key_3, 0)
+ .data(&cache_key_3, 0, 0)
.expect_err("should fail due to incorrect etag in cache key");
- let unpacked_bytes = block.data(&cache_key_1, 0).expect("should be OK as all fields match");
+ let unpacked_bytes = block
+ .data(&cache_key_1, 0, 0)
+ .expect("should be OK as all fields match");
assert_eq!(data_1, unpacked_bytes, "data block should return original bytes");
}
#[test]
fn validate_block_header() {
let block_idx = 0;
+ let block_offset = 0;
let etag = ETag::for_tests();
let s3_key = String::from("s3/key");
- let mut header = DiskBlockHeader::new(block_idx, etag.as_str().to_owned(), s3_key.clone());
+ let data_checksum = Crc32c::new(42);
+ let mut header = DiskBlockHeader::new(
+ block_idx,
+ block_offset,
+ etag.as_str().to_owned(),
+ s3_key.clone(),
+ data_checksum,
+ );
- header
- .validate(&s3_key, etag.as_str(), block_idx)
+ let checksum = header
+ .validate(&s3_key, etag.as_str(), block_idx, block_offset)
.expect("should be OK with valid fields and checksum");
+ assert_eq!(data_checksum, checksum);
// Bad fields
let err = header
- .validate("hello", etag.as_str(), block_idx)
+ .validate("hello", etag.as_str(), block_idx, block_offset)
.expect_err("should fail with invalid s3_key");
assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
let err = header
- .validate(&s3_key, "bad etag", block_idx)
+ .validate(&s3_key, "bad etag", block_idx, block_offset)
.expect_err("should fail with invalid etag");
assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
let err = header
- .validate(&s3_key, etag.as_str(), 5)
+ .validate(&s3_key, etag.as_str(), 5, block_offset)
.expect_err("should fail with invalid block idx");
assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
+ let err = header
+ .validate(&s3_key, etag.as_str(), block_idx, 1024)
+ .expect_err("should fail with invalid block offset");
+ assert!(matches!(err, DiskBlockAccessError::FieldMismatchError));
// Bad checksum
header.header_checksum = 23;
let err = header
- .validate(&s3_key, etag.as_str(), block_idx)
+ .validate(&s3_key, etag.as_str(), block_idx, block_offset)
.expect_err("should fail with invalid checksum");
assert!(matches!(err, DiskBlockAccessError::ChecksumError));
}
diff --git a/mountpoint-s3/src/data_cache/in_memory_data_cache.rs b/mountpoint-s3/src/data_cache/in_memory_data_cache.rs
index eef1872bd..5bc0fe9b1 100644
--- a/mountpoint-s3/src/data_cache/in_memory_data_cache.rs
+++ b/mountpoint-s3/src/data_cache/in_memory_data_cache.rs
@@ -3,7 +3,7 @@
use std::collections::HashMap;
use std::default::Default;
-use super::{BlockIndex, CacheKey, ChecksummedBytes, DataCache, DataCacheResult};
+use super::{BlockIndex, CacheKey, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult};
use crate::sync::RwLock;
/// Simple in-memory (RAM) implementation of [DataCache]. Recommended for use in testing only.
@@ -23,13 +23,30 @@ impl InMemoryDataCache {
}
impl DataCache for InMemoryDataCache {
- fn get_block(&self, cache_key: &CacheKey, block_idx: BlockIndex) -> DataCacheResult> {
+ fn get_block(
+ &self,
+ cache_key: &CacheKey,
+ block_idx: BlockIndex,
+ block_offset: u64,
+ ) -> DataCacheResult > {
+ if block_offset != block_idx * self.block_size {
+ return Err(DataCacheError::InvalidBlockOffset);
+ }
let data = self.data.read().unwrap();
let block_data = data.get(cache_key).and_then(|blocks| blocks.get(&block_idx)).cloned();
Ok(block_data)
}
- fn put_block(&self, cache_key: CacheKey, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()> {
+ fn put_block(
+ &self,
+ cache_key: CacheKey,
+ block_idx: BlockIndex,
+ block_offset: u64,
+ bytes: ChecksummedBytes,
+ ) -> DataCacheResult<()> {
+ if block_offset != block_idx * self.block_size {
+ return Err(DataCacheError::InvalidBlockOffset);
+ }
let mut data = self.data.write().unwrap();
let blocks = data.entry(cache_key).or_default();
blocks.insert(block_idx, bytes);
@@ -57,7 +74,8 @@ mod tests {
let data_3 = Bytes::from_static(b"Baz");
let data_3 = ChecksummedBytes::from_bytes(data_3.clone());
- let cache = InMemoryDataCache::new(8 * 1024 * 1024);
+ let block_size = 8 * 1024 * 1024;
+ let cache = InMemoryDataCache::new(block_size);
let cache_key_1 = CacheKey {
s3_key: "a".into(),
etag: ETag::for_tests(),
@@ -67,7 +85,7 @@ mod tests {
etag: ETag::for_tests(),
};
- let block = cache.get_block(&cache_key_1, 0).expect("cache is accessible");
+ let block = cache.get_block(&cache_key_1, 0, 0).expect("cache is accessible");
assert!(
block.is_none(),
"no entry should be available to return but got {:?}",
@@ -76,10 +94,10 @@ mod tests {
// PUT and GET, OK?
cache
- .put_block(cache_key_1.clone(), 0, data_1.clone())
+ .put_block(cache_key_1.clone(), 0, 0, data_1.clone())
.expect("cache is accessible");
let entry = cache
- .get_block(&cache_key_1, 0)
+ .get_block(&cache_key_1, 0, 0)
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
@@ -89,10 +107,10 @@ mod tests {
// PUT AND GET a second file, OK?
cache
- .put_block(cache_key_2.clone(), 0, data_2.clone())
+ .put_block(cache_key_2.clone(), 0, 0, data_2.clone())
.expect("cache is accessible");
let entry = cache
- .get_block(&cache_key_2, 0)
+ .get_block(&cache_key_2, 0, 0)
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
@@ -102,10 +120,10 @@ mod tests {
// PUT AND GET a second block in a cache entry, OK?
cache
- .put_block(cache_key_1.clone(), 1, data_3.clone())
+ .put_block(cache_key_1.clone(), 1, block_size, data_3.clone())
.expect("cache is accessible");
let entry = cache
- .get_block(&cache_key_1, 1)
+ .get_block(&cache_key_1, 1, block_size)
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
@@ -115,7 +133,7 @@ mod tests {
// Entry 1's first block still intact
let entry = cache
- .get_block(&cache_key_1, 0)
+ .get_block(&cache_key_1, 0, 0)
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs
index 7de69768c..2ea197f73 100644
--- a/mountpoint-s3/src/prefetch/caching_stream.rs
+++ b/mountpoint-s3/src/prefetch/caching_stream.rs
@@ -118,13 +118,15 @@ where
// We could check for missing blocks in advance and pre-emptively start a GetObject
// request, but since this stream is already behind the prefetcher, the delay is
// already likely negligible.
+ let mut block_offset = block_range.start * block_size;
for block_index in block_range.clone() {
- match self.cache.get_block(&self.cache_key, block_index) {
+ match self.cache.get_block(&self.cache_key, block_index, block_offset) {
Ok(Some(block)) => {
trace!(?key, ?range, block_index, "cache hit");
- let part = self.make_part(block, block_index, &range);
+ let part = self.make_part(block, block_index, block_offset, &range);
metrics::counter!("cache.total_bytes", part.len() as u64, "type" => "read");
self.part_queue_producer.push(Ok(part));
+ block_offset += block_size;
continue;
}
Ok(None) => trace!(?key, ?range, block_index, "cache miss - no data for block"),
@@ -132,7 +134,7 @@ where
}
// If a block is uncached or reading it fails, fallback to S3 for the rest of the stream.
return self
- .get_from_client(range.trim_start(block_index * block_size), block_index..block_range.end)
+ .get_from_client(range.trim_start(block_offset), block_index..block_range.end)
.await;
}
}
@@ -212,7 +214,7 @@ where
// We have a full block: write it to the cache, send it to the queue, and flush the buffer.
self.update_cache(block_index, block_offset, &buffer);
self.part_queue_producer
- .push(Ok(self.make_part(buffer, block_index, &range)));
+ .push(Ok(self.make_part(buffer, block_index, block_offset, &range)));
block_index += 1;
block_offset += block_size;
buffer = ChecksummedBytes::default();
@@ -236,7 +238,7 @@ where
// Write the last block to the cache.
self.update_cache(block_index, block_offset, &buffer);
self.part_queue_producer
- .push(Ok(self.make_part(buffer, block_index, &range)));
+ .push(Ok(self.make_part(buffer, block_index, block_offset, &range)));
}
break;
}
@@ -248,13 +250,11 @@ where
fn update_cache(&self, block_index: u64, block_offset: u64, block: &ChecksummedBytes) {
// TODO: consider updating the cache asynchronously
let start = Instant::now();
- match self.cache.put_block(self.cache_key.clone(), block_index, block.clone()) {
+ match self
+ .cache
+ .put_block(self.cache_key.clone(), block_index, block_offset, block.clone())
+ {
Ok(()) => {
- assert_eq!(
- block_offset,
- block_index * self.cache.block_size(),
- "invalid block offset"
- );
metrics::histogram!("cache.write_duration_us", start.elapsed().as_micros() as f64);
metrics::counter!("cache.total_bytes", block.len() as u64, "type" => "write");
}
@@ -266,9 +266,14 @@ where
/// Creates a Part that can be streamed to the prefetcher from the given cache block.
/// If required, trims the block bytes to the request range.
- fn make_part(&self, block: ChecksummedBytes, block_index: u64, range: &RequestRange) -> Part {
+ fn make_part(&self, block: ChecksummedBytes, block_index: u64, block_offset: u64, range: &RequestRange) -> Part {
+ assert_eq!(
+ block_offset,
+ block_index * self.cache.block_size(),
+ "invalid block offset"
+ );
+
let key = &self.cache_key.s3_key;
- let block_offset = block_index * self.cache.block_size();
let block_size = block.len();
let part_range = range
.trim_start(block_offset)