Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new ChecksummedBlock in DataCache #572

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions mountpoint-s3/src/checksums.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
mod block;
mod bytes;

use mountpoint_s3_crt::checksums::crc32c::Crc32c;

use thiserror::Error;

pub use block::ChecksummedBlock;
pub use bytes::ChecksummedBytes;

/// Calculates the combined checksum for `AB` where `prefix_crc` is the checksum for `A`,
/// `suffix_crc` is the checksum for `B`, and `suffic_len` is the length of `B`.
passaro marked this conversation as resolved.
Show resolved Hide resolved
pub fn combine_checksums(prefix_crc: Crc32c, suffix_crc: Crc32c, suffix_len: usize) -> Crc32c {
let combined = ::crc32c::crc32c_combine(prefix_crc.value(), suffix_crc.value(), suffix_len);
Crc32c::new(combined)
}

#[derive(Debug, Error)]
pub enum IntegrityError {
#[error("Checksum mismatch. expected: {0:?}, actual: {1:?}")]
ChecksumMismatch(Crc32c, Crc32c),
}

#[cfg(test)]
mod tests {
use super::*;
use mountpoint_s3_crt::checksums::crc32c;

#[test]
fn test_combine_checksums() {
let buf: &[u8] = b"123456789";
let (buf1, buf2) = buf.split_at(4);
let crc = crc32c::checksum(buf);
let crc1 = crc32c::checksum(buf1);
let crc2 = crc32c::checksum(buf2);
let combined = combine_checksums(crc1, crc2, buf2.len());
assert_eq!(combined, crc);
}
}
211 changes: 211 additions & 0 deletions mountpoint-s3/src/checksums/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
use bytes::{Bytes, BytesMut};
use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c};

use crate::checksums::{bytes::ChecksummedBytes, combine_checksums, IntegrityError};

/// A `ChecksummedBlock` is a bytes buffer that carries its checksum.
/// The implementation guarantees that its integrity will be validated when data is accessed.
#[derive(Debug, Clone)]
pub struct ChecksummedBlock {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if it might be nicer to have just one implementation of this stuff, and give ChecksummedBytes a shrink_to_fit-style method to get the guarantee you're looking for. But then I guess that makes extend et al more complicated because you have to handle all the different combinations to decide when you can skip validating the checksums, so probably not worth it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After all, I think shrink_to_fit would be a better approach and can also be used to improve extend. I will close this PR and open a new one with that change.

bytes: Bytes,
checksum: Crc32c,
}

impl ChecksummedBlock {
pub fn new(bytes: Bytes, checksum: Crc32c) -> Self {
Self { bytes, checksum }
}

/// Create `ChecksummedBlock` from `Bytes`, calculating its checksum.
pub fn from_bytes(bytes: Bytes) -> Self {
let checksum = crc32c::checksum(&bytes);
Self::new(bytes, checksum)
}
passaro marked this conversation as resolved.
Show resolved Hide resolved

/// Convert the `ChecksummedBlock` into `Bytes`, data integrity will be validated before converting.
///
/// Return `IntegrityError` on data corruption.
pub fn into_bytes(self) -> Result<Bytes, IntegrityError> {
self.validate()?;

Ok(self.bytes)
}
passaro marked this conversation as resolved.
Show resolved Hide resolved

/// Convert into a `ChecksummedBytes`.
pub fn into_checksummed_bytes(self) -> ChecksummedBytes {
ChecksummedBytes::new(self.bytes, self.checksum)
}

/// Returns the number of bytes contained in this `ChecksummedBlock`.
pub fn len(&self) -> usize {
self.bytes.len()
}

/// Returns true if the `ChecksummedBlock` has a length of 0.
pub fn is_empty(&self) -> bool {
self.bytes.is_empty()
}

/// Append the given bytes to current `ChecksummedBlock`.
pub fn extend(&mut self, extend: ChecksummedBlock) {
if self.is_empty() {
*self = extend;
return;
}
if extend.is_empty() {
return;
}
Comment on lines +51 to +57
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For these cases you probably need to validate the checksum of the empty side (which will be trivial to compute because they're zero-length slices), because the length might have been corrupted.


let total_len = self.bytes.len() + extend.len();
let mut bytes_mut = BytesMut::with_capacity(total_len);
bytes_mut.extend_from_slice(&self.bytes);
bytes_mut.extend_from_slice(&extend.bytes);
let new_bytes = bytes_mut.freeze();
let new_checksum = combine_checksums(self.checksum, extend.checksum, extend.len());
*self = ChecksummedBlock {
bytes: new_bytes,
checksum: new_checksum,
};
}
Comment on lines +49 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is safe since we are taking two checksummed buffers, combining the two, and calculating the new checksum independently of the new buffer.

IMO the durability risk here is mitigated, but I'd also like a second opinion from the team.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that sounds right. We know the expected checksum of each side (unlike in the ChecksummedBytes case where we only know the checksum of some larger slice of each side), and can compute the new expected checksum from those without actually looking at the bytes.

Can you add a comment here capturing that reasoning?


/// Validate data integrity in this `ChecksummedBlock`.
///
/// Return `IntegrityError` on data corruption.
pub fn validate(&self) -> Result<(), IntegrityError> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ever use this as public API? If not, might be better to make it private, since it kinda invites time-of-check/time-of-use problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave this public:

  1. it can be useful to fail fast
  2. it does not return the data, so at worst it could be redundant

let checksum = crc32c::checksum(&self.bytes);
if self.checksum != checksum {
return Err(IntegrityError::ChecksumMismatch(self.checksum, checksum));
}
Ok(())
}
}

impl Default for ChecksummedBlock {
fn default() -> Self {
let bytes = Bytes::new();
let checksum = Crc32c::new(0);
Self { bytes, checksum }
}
}

impl From<ChecksummedBlock> for ChecksummedBytes {
fn from(value: ChecksummedBlock) -> Self {
value.into_checksummed_bytes()
}
}

// Implement equality for tests only. We implement equality, and will panic if the data is corrupted.
#[cfg(test)]
impl PartialEq for ChecksummedBlock {
fn eq(&self, other: &Self) -> bool {
if self.bytes != other.bytes {
return false;
}

if self.checksum == other.checksum {
return true;
}

self.validate().expect("should be valid");
other.validate().expect("should be valid");
Comment on lines +115 to +124
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't really matter since it's just test code, but I think you want to do it this way to be correctly bracketed:

let result = self.bytes == other.bytes;
self.validate().expect("should be valid");
other.validate().expect("should be valid");
result


true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be unreachable? here we know the bytes are equal but the checksums aren't, but they both passed validation?

}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
use mountpoint_s3_crt::checksums::crc32c;

use super::*;

#[test]
fn test_into_bytes() {
let bytes = Bytes::from_static(b"some bytes");
let expected = bytes.clone();
let checksum = crc32c::checksum(&bytes);
let checksummed_block = ChecksummedBlock::new(bytes, checksum);

let actual = checksummed_block.into_bytes().unwrap();
assert_eq!(expected, actual);
}

#[test]
fn test_into_bytes_integrity_error() {
let bytes = Bytes::from_static(b"some bytes");
let checksum = crc32c::checksum(&bytes);
let mut checksummed_block = ChecksummedBlock::new(bytes, checksum);
checksummed_block.bytes = Bytes::from_static(b"new bytes");

let actual = checksummed_block.into_bytes();
assert!(matches!(actual, Err(IntegrityError::ChecksumMismatch(_, _))));
}

#[test]
fn test_into_checksummed_bytes() {
let bytes = Bytes::from_static(b"some bytes");
let checksum = crc32c::checksum(&bytes);
let checksummed_block = ChecksummedBlock::new(bytes, checksum);
let checksummed_bytes = checksummed_block.clone().into_checksummed_bytes();

assert_eq!(
checksummed_block.into_bytes().unwrap(),
checksummed_bytes.into_bytes().unwrap()
);
}

#[test]
fn test_extend() {
let bytes = Bytes::from_static(b"some bytes");
let expected = Bytes::from_static(b"some bytes extended");
let checksum = crc32c::checksum(&bytes);
let mut checksummed_block = ChecksummedBlock::new(bytes, checksum);

let extend = Bytes::from_static(b" extended");
let extend_checksum = crc32c::checksum(&extend);
let extend = ChecksummedBlock::new(extend, extend_checksum);
checksummed_block.extend(extend);
let actual = checksummed_block.bytes;
assert_eq!(expected, actual);
}

#[test]
fn test_extend_self_corrupted() {
let bytes = Bytes::from_static(b"some bytes");
let checksum = crc32c::checksum(&bytes);
let mut checksummed_block = ChecksummedBlock::new(bytes, checksum);

let currupted_bytes = Bytes::from_static(b"corrupted data");
checksummed_block.bytes = currupted_bytes.clone();
passaro marked this conversation as resolved.
Show resolved Hide resolved

let extend = Bytes::from_static(b" extended");
let extend_checksum = crc32c::checksum(&extend);
let extend = ChecksummedBlock::new(extend, extend_checksum);
checksummed_block.extend(extend);
assert!(matches!(
checksummed_block.validate(),
Err(IntegrityError::ChecksumMismatch(_, _))
));
}

#[test]
fn test_extend_other_corrupted() {
let bytes = Bytes::from_static(b"some bytes");
let checksum = crc32c::checksum(&bytes);
let mut checksummed_block = ChecksummedBlock::new(bytes, checksum);

let extend = Bytes::from_static(b" extended");
let extend_checksum = crc32c::checksum(&extend);
let mut extend = ChecksummedBlock::new(extend, extend_checksum);

let currupted_bytes = Bytes::from_static(b"corrupted data");
extend.bytes = currupted_bytes.clone();

checksummed_block.extend(extend);
assert!(matches!(
checksummed_block.validate(),
Err(IntegrityError::ChecksumMismatch(_, _))
));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::ops::RangeBounds;

use bytes::{Bytes, BytesMut};
use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c};
use thiserror::Error;

use crate::checksums::IntegrityError;

/// A `ChecksummedBytes` is a bytes buffer that carries its checksum.
/// The implementation guarantees that its integrity will be validated when data transformation occurs.
Expand Down Expand Up @@ -56,6 +59,18 @@ impl ChecksummedBytes {
}
}

/// Returns a slice of self for the provided range.
///
/// This operation just increases the reference count and sets a few indices,
/// so there will be no validation and the checksum will not be recomputed.
pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
Self {
orig_bytes: self.orig_bytes.clone(),
curr_slice: self.curr_slice.slice(range),
checksum: self.checksum,
}
}

/// Append the given checksummed bytes to current `ChecksummedBytes`, ensure that data integrity will
/// be validated.
///
Expand Down Expand Up @@ -118,13 +133,6 @@ impl Default for ChecksummedBytes {
}
}
}

#[derive(Debug, Error)]
pub enum IntegrityError {
#[error("Checksum mismatch. expected: {0:?}, actual: {1:?}")]
ChecksumMismatch(Crc32c, Crc32c),
}

// Implement equality for tests only. We implement equality, and will panic if the data is corrupted.
#[cfg(test)]
impl PartialEq for ChecksummedBytes {
Expand All @@ -149,9 +157,7 @@ mod tests {
use bytes::Bytes;
use mountpoint_s3_crt::checksums::crc32c;

use crate::prefetch::checksummed_bytes::IntegrityError;

use super::ChecksummedBytes;
use super::*;

#[test]
fn test_into_bytes() {
Expand Down
6 changes: 3 additions & 3 deletions mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::ops::Range;

use thiserror::Error;

pub use crate::prefetch::checksummed_bytes::ChecksummedBytes;
pub use crate::checksums::ChecksummedBlock;

/// Indexes blocks within a given object.
pub type BlockIndex = u64;
Expand All @@ -36,10 +36,10 @@ pub trait DataCache<Key> {
/// Get block of data from the cache for the given [Key] and [BlockIndex], if available.
///
/// Operation may fail due to errors, or return [None] if the block was not available in the cache.
fn get_block(&self, cache_key: &Key, block_idx: BlockIndex) -> DataCacheResult<Option<ChecksummedBytes>>;
fn get_block(&self, cache_key: &Key, block_idx: BlockIndex) -> DataCacheResult<Option<ChecksummedBlock>>;

/// Put block of data to the cache for the given [Key] and [BlockIndex].
fn put_block(&self, cache_key: Key, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()>;
fn put_block(&self, cache_key: Key, block_idx: BlockIndex, bytes: ChecksummedBlock) -> DataCacheResult<()>;

/// Returns the block size for the data cache.
fn block_size(&self) -> u64;
Expand Down
24 changes: 11 additions & 13 deletions mountpoint-s3/src/data_cache/in_memory_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::default::Default;
use std::hash::Hash;
use std::ops::Range;

use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheResult};
use super::{BlockIndex, ChecksummedBlock, DataCache, DataCacheResult};
use crate::sync::RwLock;

/// Simple in-memory (RAM) implementation of [DataCache]. Recommended for use in testing only.
pub struct InMemoryDataCache<CacheKey> {
data: RwLock<HashMap<CacheKey, HashMap<BlockIndex, ChecksummedBytes>>>,
data: RwLock<HashMap<CacheKey, HashMap<BlockIndex, ChecksummedBlock>>>,
block_size: u64,
}

Expand All @@ -25,13 +25,13 @@ impl<Key> InMemoryDataCache<Key> {
}

impl<Key: Eq + Hash> DataCache<Key> for InMemoryDataCache<Key> {
fn get_block(&self, cache_key: &Key, block_idx: BlockIndex) -> DataCacheResult<Option<ChecksummedBytes>> {
fn get_block(&self, cache_key: &Key, block_idx: BlockIndex) -> DataCacheResult<Option<ChecksummedBlock>> {
let data = self.data.read().unwrap();
let block_data = data.get(cache_key).and_then(|blocks| blocks.get(&block_idx)).cloned();
Ok(block_data)
}

fn put_block(&self, cache_key: Key, block_idx: BlockIndex, bytes: ChecksummedBytes) -> DataCacheResult<()> {
fn put_block(&self, cache_key: Key, block_idx: BlockIndex, bytes: ChecksummedBlock) -> DataCacheResult<()> {
let mut data = self.data.write().unwrap();
let blocks = data.entry(cache_key).or_default();
blocks.insert(block_idx, bytes);
Expand Down Expand Up @@ -59,20 +59,18 @@ mod tests {

use bytes::Bytes;

use mountpoint_s3_crt::checksums::crc32c;

type TestCacheKey = String;

#[test]
fn test_put_get() {
let data_1 = Bytes::from_static(b"Hello world");
let data_1 = ChecksummedBytes::new(data_1.clone(), crc32c::checksum(&data_1));
let data_1 = ChecksummedBlock::from_bytes(data_1.clone());
let data_2 = Bytes::from_static(b"Foo bar");
let data_2 = ChecksummedBytes::new(data_2.clone(), crc32c::checksum(&data_2));
let data_2 = ChecksummedBlock::from_bytes(data_2.clone());
let data_3 = Bytes::from_static(b"Baz");
let data_3 = ChecksummedBytes::new(data_3.clone(), crc32c::checksum(&data_3));
let data_3 = ChecksummedBlock::from_bytes(data_3.clone());

let mut cache = InMemoryDataCache::new(8 * 1024 * 1024);
let cache = InMemoryDataCache::new(8 * 1024 * 1024);
let cache_key_1: TestCacheKey = String::from("a");
let cache_key_2: TestCacheKey = String::from("b");

Expand Down Expand Up @@ -136,11 +134,11 @@ mod tests {
#[test]
fn test_cached_indices() {
let data_1 = Bytes::from_static(b"Hello world");
let data_1 = ChecksummedBytes::new(data_1.clone(), crc32c::checksum(&data_1));
let data_1 = ChecksummedBlock::from_bytes(data_1.clone());
let data_2 = Bytes::from_static(b"Foo bar");
let data_2 = ChecksummedBytes::new(data_2.clone(), crc32c::checksum(&data_2));
let data_2 = ChecksummedBlock::from_bytes(data_2.clone());

let mut cache = InMemoryDataCache::new(8 * 1024 * 1024);
let cache = InMemoryDataCache::new(8 * 1024 * 1024);
let cache_key_1: TestCacheKey = String::from("a");
let cache_key_2: TestCacheKey = String::from("b");

Expand Down
Loading