-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use new ChecksummedBlock in DataCache #572
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
mod block; | ||
mod bytes; | ||
|
||
use mountpoint_s3_crt::checksums::crc32c::Crc32c; | ||
|
||
use thiserror::Error; | ||
|
||
pub use block::ChecksummedBlock; | ||
pub use bytes::ChecksummedBytes; | ||
|
||
/// Calculates the combined checksum for `AB` where `prefix_crc` is the checksum for `A`, | ||
/// `suffix_crc` is the checksum for `B`, and `suffix_len` is the length of `B`. | ||
pub fn combine_checksums(prefix_crc: Crc32c, suffix_crc: Crc32c, suffix_len: usize) -> Crc32c { | ||
let combined = ::crc32c::crc32c_combine(prefix_crc.value(), suffix_crc.value(), suffix_len); | ||
Crc32c::new(combined) | ||
} | ||
|
||
#[derive(Debug, Error)] | ||
pub enum IntegrityError { | ||
#[error("Checksum mismatch. expected: {0:?}, actual: {1:?}")] | ||
ChecksumMismatch(Crc32c, Crc32c), | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use mountpoint_s3_crt::checksums::crc32c; | ||
|
||
#[test] | ||
fn test_combine_checksums() { | ||
let buf: &[u8] = b"123456789"; | ||
let (buf1, buf2) = buf.split_at(4); | ||
let crc = crc32c::checksum(buf); | ||
let crc1 = crc32c::checksum(buf1); | ||
let crc2 = crc32c::checksum(buf2); | ||
let combined = combine_checksums(crc1, crc2, buf2.len()); | ||
assert_eq!(combined, crc); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
use bytes::{Bytes, BytesMut}; | ||
use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c}; | ||
|
||
use crate::checksums::{bytes::ChecksummedBytes, combine_checksums, IntegrityError}; | ||
|
||
/// A `ChecksummedBlock` is a bytes buffer that carries its checksum. | ||
/// The implementation guarantees that its integrity will be validated when data is accessed. | ||
#[derive(Debug, Clone)] | ||
pub struct ChecksummedBlock { | ||
bytes: Bytes, | ||
checksum: Crc32c, | ||
} | ||
|
||
impl ChecksummedBlock { | ||
pub fn new(bytes: Bytes, checksum: Crc32c) -> Self { | ||
Self { bytes, checksum } | ||
} | ||
|
||
/// Create `ChecksummedBlock` from `Bytes`, calculating its checksum. | ||
pub fn from_bytes(bytes: Bytes) -> Self { | ||
let checksum = crc32c::checksum(&bytes); | ||
Self::new(bytes, checksum) | ||
} | ||
passaro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/// Convert the `ChecksummedBlock` into `Bytes`, data integrity will be validated before converting. | ||
/// | ||
/// Return `IntegrityError` on data corruption. | ||
pub fn into_bytes(self) -> Result<Bytes, IntegrityError> { | ||
self.validate()?; | ||
|
||
Ok(self.bytes) | ||
} | ||
passaro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/// Convert into a `ChecksummedBytes`. | ||
pub fn into_checksummed_bytes(self) -> ChecksummedBytes { | ||
ChecksummedBytes::new(self.bytes, self.checksum) | ||
} | ||
|
||
/// Returns the number of bytes contained in this `ChecksummedBlock`. | ||
pub fn len(&self) -> usize { | ||
self.bytes.len() | ||
} | ||
|
||
/// Returns true if the `ChecksummedBlock` has a length of 0. | ||
pub fn is_empty(&self) -> bool { | ||
self.bytes.is_empty() | ||
} | ||
|
||
/// Append the given bytes to current `ChecksummedBlock`. | ||
pub fn extend(&mut self, extend: ChecksummedBlock) { | ||
if self.is_empty() { | ||
*self = extend; | ||
return; | ||
} | ||
if extend.is_empty() { | ||
return; | ||
} | ||
Comment on lines
+51
to
+57
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For these cases you probably need to validate the checksum of the empty side (which will be trivial to compute because they're zero-length slices), because the length might have been corrupted. |
||
|
||
let total_len = self.bytes.len() + extend.len(); | ||
let mut bytes_mut = BytesMut::with_capacity(total_len); | ||
bytes_mut.extend_from_slice(&self.bytes); | ||
bytes_mut.extend_from_slice(&extend.bytes); | ||
let new_bytes = bytes_mut.freeze(); | ||
let new_checksum = combine_checksums(self.checksum, extend.checksum, extend.len()); | ||
*self = ChecksummedBlock { | ||
bytes: new_bytes, | ||
checksum: new_checksum, | ||
}; | ||
} | ||
Comment on lines
+49
to
+69
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is safe since we are taking two checksummed buffers, combining the two, and calculating the new checksum independently of the new buffer. IMO the durability risk here is mitigated, but I'd also like a second opinion from the team. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that sounds right. We know the expected checksum of each side (unlike in the Can you add a comment here capturing that reasoning? |
||
|
||
/// Validate data integrity in this `ChecksummedBlock`. | ||
/// | ||
/// Return `IntegrityError` on data corruption. | ||
pub fn validate(&self) -> Result<(), IntegrityError> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we ever use this as public API? If not, might be better to make it private, since it kinda invites time-of-check/time-of-use problems. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd leave this public:
|
||
let checksum = crc32c::checksum(&self.bytes); | ||
if self.checksum != checksum { | ||
return Err(IntegrityError::ChecksumMismatch(self.checksum, checksum)); | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
impl Default for ChecksummedBlock { | ||
fn default() -> Self { | ||
let bytes = Bytes::new(); | ||
let checksum = Crc32c::new(0); | ||
Self { bytes, checksum } | ||
} | ||
} | ||
|
||
impl From<ChecksummedBlock> for ChecksummedBytes { | ||
fn from(value: ChecksummedBlock) -> Self { | ||
value.into_checksummed_bytes() | ||
} | ||
} | ||
|
||
impl From<Bytes> for ChecksummedBlock { | ||
fn from(value: Bytes) -> Self { | ||
Self::from_bytes(value) | ||
} | ||
} | ||
|
||
impl TryFrom<ChecksummedBlock> for Bytes { | ||
type Error = IntegrityError; | ||
|
||
fn try_from(value: ChecksummedBlock) -> Result<Self, Self::Error> { | ||
value.into_bytes() | ||
} | ||
} | ||
|
||
// Implement equality for tests only. We implement equality, and will panic if the data is corrupted. | ||
#[cfg(test)] | ||
impl PartialEq for ChecksummedBlock { | ||
fn eq(&self, other: &Self) -> bool { | ||
if self.bytes != other.bytes { | ||
return false; | ||
} | ||
|
||
if self.checksum == other.checksum { | ||
return true; | ||
} | ||
|
||
self.validate().expect("should be valid"); | ||
other.validate().expect("should be valid"); | ||
Comment on lines
+115
to
+124
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't really matter since it's just test code, but I think you want to do it this way to be correctly bracketed: let result = self.bytes == other.bytes;
self.validate().expect("should be valid");
other.validate().expect("should be valid");
result |
||
|
||
true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should be unreachable? here we know the bytes are equal but the checksums aren't, but they both passed validation? |
||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use bytes::Bytes; | ||
use mountpoint_s3_crt::checksums::crc32c; | ||
|
||
use super::*; | ||
|
||
#[test] | ||
fn test_into_bytes() { | ||
let bytes = Bytes::from_static(b"some bytes"); | ||
let expected = bytes.clone(); | ||
let checksum = crc32c::checksum(&bytes); | ||
let checksummed_block = ChecksummedBlock::new(bytes, checksum); | ||
|
||
let actual = checksummed_block.into_bytes().unwrap(); | ||
assert_eq!(expected, actual); | ||
} | ||
|
||
#[test] | ||
fn test_into_bytes_integrity_error() { | ||
let bytes = Bytes::from_static(b"some bytes"); | ||
let checksum = crc32c::checksum(&bytes); | ||
let mut checksummed_block = ChecksummedBlock::new(bytes, checksum); | ||
checksummed_block.bytes = Bytes::from_static(b"new bytes"); | ||
|
||
let actual = checksummed_block.into_bytes(); | ||
assert!(matches!(actual, Err(IntegrityError::ChecksumMismatch(_, _)))); | ||
} | ||
|
||
#[test] | ||
fn test_into_checksummed_bytes() { | ||
let bytes = Bytes::from_static(b"some bytes"); | ||
let checksum = crc32c::checksum(&bytes); | ||
let checksummed_block = ChecksummedBlock::new(bytes, checksum); | ||
let checksummed_bytes = checksummed_block.clone().into_checksummed_bytes(); | ||
|
||
assert_eq!( | ||
checksummed_block.into_bytes().unwrap(), | ||
checksummed_bytes.into_bytes().unwrap() | ||
); | ||
} | ||
|
||
#[test] | ||
fn test_extend() { | ||
let bytes = Bytes::from_static(b"some bytes"); | ||
let extend = Bytes::from_static(b" extended"); | ||
let expected = Bytes::from_static(b"some bytes extended"); | ||
|
||
let mut checksummed_block = ChecksummedBlock::from_bytes(bytes); | ||
checksummed_block.extend(extend.into()); | ||
let actual = checksummed_block.bytes; | ||
assert_eq!(expected, actual); | ||
} | ||
|
||
#[test] | ||
fn test_extend_self_corrupted() { | ||
let checksum = crc32c::checksum(b"some bytes"); | ||
let currupted_bytes = Bytes::from_static(b"corrupted data"); | ||
let mut checksummed_block = ChecksummedBlock::new(currupted_bytes, checksum); | ||
|
||
let extend = Bytes::from_static(b" extended"); | ||
checksummed_block.extend(extend.into()); | ||
assert!(matches!( | ||
checksummed_block.validate(), | ||
Err(IntegrityError::ChecksumMismatch(_, _)) | ||
)); | ||
} | ||
|
||
#[test] | ||
fn test_extend_other_corrupted() { | ||
let bytes = Bytes::from_static(b"some bytes"); | ||
let mut checksummed_block = ChecksummedBlock::from_bytes(bytes); | ||
|
||
let currupted_bytes = Bytes::from_static(b"corrupted data"); | ||
let extend_checksum = crc32c::checksum(b" extended"); | ||
let extend_block = ChecksummedBlock::new(currupted_bytes, extend_checksum); | ||
|
||
checksummed_block.extend(extend_block); | ||
assert!(matches!( | ||
checksummed_block.validate(), | ||
Err(IntegrityError::ChecksumMismatch(_, _)) | ||
)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if it might be nicer to have just one implementation of this stuff, and give
ChecksummedBytes
ashrink_to_fit
-style method to get the guarantee you're looking for. But then I guess that makesextend
et al more complicated because you have to handle all the different combinations to decide when you can skip validating the checksums, so probably not worth it?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After all, I think
shrink_to_fit
would be a better approach and can also be used to improveextend
. I will close this PR and open a new one with that change.