Skip to content

Commit

Permalink
Add new DataCache trait and InMemoryDataCache implementation (#557)
Browse files Browse the repository at this point in the history
* Add new DataCache trait and InMemoryDataCache implementation

Signed-off-by: Daniel Carl Jones <[email protected]>

* Replace indices_for_byte_range with block_size, moving responsibility to caller

Signed-off-by: Daniel Carl Jones <[email protected]>

* Implement PartialEq for ChecksummedBytes when testing, replacing assert_eq_checksummed_bytes macro

Signed-off-by: Daniel Carl Jones <[email protected]>

* Update DataCache to use interior mutability

Signed-off-by: Daniel Carl Jones <[email protected]>

* Fix assert_eq! issue (& and &&)

Signed-off-by: Daniel Carl Jones <[email protected]>

* Update error types

Signed-off-by: Daniel Carl Jones <[email protected]>

* Update struct/module visibility

Signed-off-by: Daniel Carl Jones <[email protected]>

---------

Signed-off-by: Daniel Carl Jones <[email protected]>
  • Loading branch information
dannycjones authored Oct 23, 2023
1 parent dbfa87c commit cb0d26b
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 1 deletion.
54 changes: 54 additions & 0 deletions mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//! Traits and types for data caching.
//!
//! The data cache aims to reduce repeated fetches of S3 object content,
//! reducing both the number of requests as well as the latency for the reads.
//! Ultimately, this means reduced cost in terms of S3 billing as well as compute time.
pub mod in_memory_data_cache;

use std::ops::Range;

use thiserror::Error;

pub use crate::prefetch::checksummed_bytes::ChecksummedBytes;

/// Indexes blocks within a given object.
pub type BlockIndex = u64;

/// Errors returned by operations on a [DataCache]
#[derive(Debug, Error)]
pub enum DataCacheError {
#[error("IO error when reading or writing from cache: {0}")]
IoFailure(#[from] std::io::Error),
#[error("Block content was not valid/readable")]
InvalidBlockContent,
}

pub type DataCacheResult<Value> = Result<Value, DataCacheError>;

/// Cache data with a checksum identified by some [Key].
///
/// The underlying cache is divided into blocks of equal size.
///
/// TODO: Deletion and eviction of cache entries.
/// TODO: Some version information (ETag) independent from [Key] to allow smarter eviction?
pub trait DataCache<Key> {
/// Get block of data from the cache for the given [Key] and [BlockIndex], if available.
///
/// Operation may fail due to errors, or return [None] if the block was not available in the cache.
fn get_block(&self, cache_key: &Key, block_idx: BlockIndex) -> DataCacheResult<Option<ChecksummedBytes>>;

/// Put block of data to the cache for the given [Key] and [BlockIndex].
fn put_block(&self, cache_key: Key, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()>;

/// Returns the block size for the data cache.
fn block_size(&self) -> u64;

/// For the given range of blocks, which are present in the cache?
/// Indices in the vector are already sorted.
///
/// It is possible that the **blocks may be deleted before reading**, or may be corrupted or inaccessible.
/// This method only indicates that a cache entry was present at the time of calling.
/// There is no guarantee that the data will still be available at the time of reading.
fn cached_block_indices(&self, cache_key: &Key, range: Range<BlockIndex>) -> DataCacheResult<Vec<BlockIndex>>;
}
168 changes: 168 additions & 0 deletions mountpoint-s3/src/data_cache/in_memory_data_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
//! Module for the in-memory data cache implementation used for testing.
use std::collections::HashMap;
use std::default::Default;
use std::hash::Hash;
use std::ops::Range;

use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheResult};
use crate::sync::RwLock;

/// Simple in-memory (RAM) implementation of [DataCache]. Recommended for use in testing only.
pub struct InMemoryDataCache<CacheKey> {
data: RwLock<HashMap<CacheKey, HashMap<BlockIndex, ChecksummedBytes>>>,
block_size: u64,
}

impl<Key> InMemoryDataCache<Key> {
/// Create a new instance of an [InMemoryDataCache] with the specified `block_size`.
pub fn new(block_size: u64) -> Self {
InMemoryDataCache {
data: Default::default(),
block_size,
}
}
}

impl<Key: Eq + Hash> DataCache<Key> for InMemoryDataCache<Key> {
fn get_block(&self, cache_key: &Key, block_idx: BlockIndex) -> DataCacheResult<Option<ChecksummedBytes>> {
let data = self.data.read().unwrap();
let block_data = data.get(cache_key).and_then(|blocks| blocks.get(&block_idx)).cloned();
Ok(block_data)
}

fn put_block(&self, cache_key: Key, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()> {
let mut data = self.data.write().unwrap();
let blocks = data.entry(cache_key).or_default();
blocks.insert(block_idx, bytes);
Ok(())
}

fn block_size(&self) -> u64 {
self.block_size
}

fn cached_block_indices(&self, cache_key: &Key, range: Range<BlockIndex>) -> DataCacheResult<Vec<BlockIndex>> {
let data = self.data.read().unwrap();
let result = match data.get(cache_key) {
None => Vec::new(),
Some(blocks) => range.into_iter().filter(|idx| blocks.contains_key(idx)).collect(),
};

Ok(result)
}
}

#[cfg(test)]
mod tests {
use super::*;

use bytes::Bytes;

use mountpoint_s3_crt::checksums::crc32c;

type TestCacheKey = String;

#[test]
fn test_put_get() {
let data_1 = Bytes::from_static(b"Hello world");
let data_1 = ChecksummedBytes::new(data_1.clone(), crc32c::checksum(&data_1));
let data_2 = Bytes::from_static(b"Foo bar");
let data_2 = ChecksummedBytes::new(data_2.clone(), crc32c::checksum(&data_2));
let data_3 = Bytes::from_static(b"Baz");
let data_3 = ChecksummedBytes::new(data_3.clone(), crc32c::checksum(&data_3));

let mut cache = InMemoryDataCache::new(8 * 1024 * 1024);
let cache_key_1: TestCacheKey = String::from("a");
let cache_key_2: TestCacheKey = String::from("b");

let block = cache.get_block(&cache_key_1, 0).expect("cache is accessible");
assert!(
block.is_none(),
"no entry should be available to return but got {:?}",
block,
);

// PUT and GET, OK?
cache
.put_block(cache_key_1.clone(), 0, data_1.clone())
.expect("cache is accessible");
let entry = cache
.get_block(&cache_key_1, 0)
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
data_1, entry,
"cache entry returned should match original bytes after put"
);

// PUT AND GET a second file, OK?
cache
.put_block(cache_key_2.clone(), 0, data_2.clone())
.expect("cache is accessible");
let entry = cache
.get_block(&cache_key_2, 0)
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
data_2, entry,
"cache entry returned should match original bytes after put"
);

// PUT AND GET a second block in a cache entry, OK?
cache
.put_block(cache_key_1.clone(), 1, data_3.clone())
.expect("cache is accessible");
let entry = cache
.get_block(&cache_key_1, 1)
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
data_3, entry,
"cache entry returned should match original bytes after put"
);

// Entry 1's first block still intact
let entry = cache
.get_block(&cache_key_1, 0)
.expect("cache is accessible")
.expect("cache entry should be returned");
assert_eq!(
data_1, entry,
"cache entry returned should match original bytes after put"
);
}

#[test]
fn test_cached_indices() {
let data_1 = Bytes::from_static(b"Hello world");
let data_1 = ChecksummedBytes::new(data_1.clone(), crc32c::checksum(&data_1));
let data_2 = Bytes::from_static(b"Foo bar");
let data_2 = ChecksummedBytes::new(data_2.clone(), crc32c::checksum(&data_2));

let mut cache = InMemoryDataCache::new(8 * 1024 * 1024);
let cache_key_1: TestCacheKey = String::from("a");
let cache_key_2: TestCacheKey = String::from("b");

let cached_indices = cache
.cached_block_indices(&cache_key_1, 0..5)
.expect("should not error");
let expected: Vec<BlockIndex> = Vec::new();
assert_eq!(cached_indices, expected);

cache
.put_block(cache_key_1.clone(), 2, data_1.clone())
.expect("no reason to error, cache is accessible");
cache
.put_block(cache_key_1.clone(), 3, data_2.clone())
.expect("no reason to error, cache is accessible");
cache
.put_block(cache_key_2.clone(), 5, data_2.clone())
.expect("no reason to error, cache is accessible");

let cached_indices = cache
.cached_block_indices(&cache_key_1, 0..12)
.expect("should not error");
assert_eq!(cached_indices, vec![2, 3]);
}
}
2 changes: 2 additions & 0 deletions mountpoint-s3/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod data_cache;
pub mod fs;
pub mod fuse;
mod inode;
Expand All @@ -8,6 +9,7 @@ pub mod prefetch;
pub mod prefix;
mod sync;
mod upload;

pub use fs::{S3Filesystem, S3FilesystemConfig};

/// Enable tracing and CRT logging when running unit tests.
Expand Down
23 changes: 22 additions & 1 deletion mountpoint-s3/src/prefetch/checksummed_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use thiserror::Error;

/// A `ChecksummedBytes` is a bytes buffer that carries its checksum.
/// The implementation guarantees that its integrity will be validated when data transformation occurs.
#[derive(Debug, Clone)]
#[derive(Clone, Debug)]
pub struct ChecksummedBytes {
orig_bytes: Bytes,
curr_slice: Bytes,
/// Checksum for `orig_bytes`
checksum: Crc32c,
}

Expand Down Expand Up @@ -123,6 +124,26 @@ pub enum IntegrityError {
#[error("Checksum mismatch. expected: {0:?}, actual: {1:?}")]
ChecksumMismatch(Crc32c, Crc32c),
}

// Implement equality for tests only. We implement equality, and will panic if the data is corrupted.
#[cfg(test)]
impl PartialEq for ChecksummedBytes {
fn eq(&self, other: &Self) -> bool {
if self.curr_slice != other.curr_slice {
return false;
}

if self.orig_bytes == other.orig_bytes && self.checksum == other.checksum {
return true;
}

self.validate().expect("should be valid");
other.validate().expect("should be valid");

true
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
Expand Down

0 comments on commit cb0d26b

Please sign in to comment.