Skip to content

Commit

Permalink
Improve support for additional checksum algorithms in mountpoint-s3-c…
Browse files Browse the repository at this point in the history
…lient (#1157)

Allows to specify any of the supported checksum algorithms when
uploading objects.

### Does this change impact existing behavior?

No.

### Does this change need a changelog entry?

Yes, adding an entry to the `mountpoint-s3-client` changelog.

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro authored Nov 22, 2024
1 parent 47e1d56 commit 458ffdc
Show file tree
Hide file tree
Showing 13 changed files with 384 additions and 122 deletions.
2 changes: 2 additions & 0 deletions mountpoint-s3-client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* `ChecksumAlgorithm` has a new variant `Unknown(String)`,
to accomodate algorithms not recognized by the client should they be added in future.
([#1086](https://github.com/awslabs/mountpoint-s3/pull/1086))
* Allow to specify any of the supported checksum algorithms when uploading objects with `put_object_single`.
([#1157](https://github.com/awslabs/mountpoint-s3/pull/1157))

### Breaking changes

Expand Down
68 changes: 67 additions & 1 deletion mountpoint-s3-client/src/checksums.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! Provides base64 encoding/decoding for CRC32C checksums.
//! Provides base64 encoding/decoding for various checksums.
pub use mountpoint_s3_crt::checksums::crc32::{self, Crc32};
pub use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c};
pub use mountpoint_s3_crt::checksums::sha1::{self, Sha1};
pub use mountpoint_s3_crt::checksums::sha256::{self, Sha256};

use base64ct::Base64;
use base64ct::Encoding;
Expand All @@ -17,6 +20,28 @@ pub fn crc32c_from_base64(base64_str: &str) -> Result<Crc32c, ParseError> {
Ok(Crc32c::new(u32::from_be_bytes(dec_buf)))
}

/// The base64 encoding for this CRC32 checksum value.
pub fn crc32_to_base64(checksum: &Crc32) -> String {
Base64::encode_string(&checksum.value().to_be_bytes())
}

/// Create a CRC32 checksum from a base64 encoding.
pub fn crc32_from_base64(base64_str: &str) -> Result<Crc32, ParseError> {
let mut dec_buf = [0u8; std::mem::size_of::<u32>()];
let _ = Base64::decode(base64_str, &mut dec_buf)?;
Ok(Crc32::new(u32::from_be_bytes(dec_buf)))
}

/// The base64 encoding for this SHA1 checksum value.
pub fn sha1_to_base64(checksum: &Sha1) -> String {
Base64::encode_string(checksum.value())
}

/// The base64 encoding for this SHA256 checksum value.
pub fn sha256_to_base64(checksum: &Sha256) -> String {
Base64::encode_string(checksum.value())
}

/// Error parsing CRC32C checksums.
#[derive(Error, Debug)]
pub enum ParseError {
Expand Down Expand Up @@ -51,4 +76,45 @@ mod tests {
let err = crc32c_from_base64(invalid_base64).expect_err("parsing should fail");
assert!(matches!(err, ParseError::Base64ParseError(_)));
}

#[test]
fn test_crc32_to_base64() {
let crc = Crc32::new(1234);
let base64 = crc32_to_base64(&crc);
assert_eq!(&base64, "AAAE0g==");
}

#[test]
fn test_crc32_from_base64() {
let base64 = "AAAE0g==";
let crc = crc32_from_base64(base64).expect("parsing should succeeed");
assert_eq!(crc.value(), 1234);
}

#[test_case("AAA")]
#[test_case("AAAE0g")]
#[test_case("AAAE0gAA==")]
fn test_crc32_from_base64_error(invalid_base64: &str) {
let err = crc32_from_base64(invalid_base64).expect_err("parsing should fail");
assert!(matches!(err, ParseError::Base64ParseError(_)));
}

#[test]
fn test_sha1_to_base64() {
let sha1 = Sha1::new([
247, 195, 188, 29, 128, 142, 4, 115, 42, 223, 103, 153, 101, 204, 195, 76, 167, 174, 52, 65,
]);
let base64 = sha1_to_base64(&sha1);
assert_eq!(&base64, "98O8HYCOBHMq32eZZczDTKeuNEE=");
}

#[test]
fn test_sha256_to_base64() {
let sha256 = Sha256::new([
21, 226, 176, 211, 195, 56, 145, 235, 176, 241, 239, 96, 158, 196, 25, 66, 12, 32, 227, 32, 206, 148, 198,
95, 188, 140, 51, 18, 68, 142, 178, 37,
]);
let base64 = sha256_to_base64(&sha256);
assert_eq!(&base64, "FeKw08M4keuw8e9gnsQZQgwg4yDOlMZfvIwzEkSOsiU=");
}
}
103 changes: 74 additions & 29 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::time::{Duration, SystemTime};
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use lazy_static::lazy_static;
use mountpoint_s3_crt::checksums::crc32c;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use rand::seq::SliceRandom;
use rand::SeedableRng;
Expand All @@ -22,7 +21,9 @@ use thiserror::Error;
use time::OffsetDateTime;
use tracing::trace;

use crate::checksums::crc32c_to_base64;
use crate::checksums::{
crc32, crc32_to_base64, crc32c, crc32c_to_base64, sha1, sha1_to_base64, sha256, sha256_to_base64,
};
use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
use crate::object_client::{
Checksum, ChecksumAlgorithm, ChecksumMode, CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError,
Expand Down Expand Up @@ -178,6 +179,29 @@ impl MockClient {
}
}

/// Mock implementation of PutObject.
fn mock_put_object<'a>(
&self,
key: &str,
params: &PutObjectSingleParams,
contents: impl AsRef<[u8]> + Send + 'a,
) -> ObjectClientResult<PutObjectResult, PutObjectError, MockClientError> {
let checksum = validate_checksum(contents.as_ref(), params.checksum.as_ref())?;

let mut object: MockObject = contents.into();
object.set_storage_class(params.storage_class.clone());
object.set_object_metadata(params.object_metadata.clone());
object.set_checksum(checksum);

let etag = object.etag.clone();
add_object(&self.objects, key, object);
Ok(PutObjectResult {
etag,
sse_type: None,
sse_kms_key_id: None,
})
}

/// Track number of operations for verifying API calls made by the client in testing.
fn inc_op_count(&self, operation: Operation) {
let mut op_counts = self.operation_counts.write().unwrap();
Expand Down Expand Up @@ -454,6 +478,11 @@ impl MockObject {
}
}

pub fn with_computed_checksums(mut self, algorithms: &[ChecksumAlgorithm]) -> Self {
self.checksum = compute_checksum(&self.read(0, self.size), algorithms);
self
}

pub fn set_last_modified(&mut self, last_modified: OffsetDateTime) {
self.last_modified = last_modified;
}
Expand Down Expand Up @@ -505,6 +534,46 @@ impl std::fmt::Debug for MockObject {
}
}

fn compute_checksum(content: &[u8], algorithms: &[ChecksumAlgorithm]) -> Checksum {
let mut checksum = Checksum::empty();
for algorithm in algorithms {
match algorithm {
ChecksumAlgorithm::Crc32 => {
let crc32 = crc32::checksum(content);
checksum.checksum_crc32 = Some(crc32_to_base64(&crc32));
}
ChecksumAlgorithm::Crc32c => {
let crc32c = crc32c::checksum(content);
checksum.checksum_crc32c = Some(crc32c_to_base64(&crc32c));
}
ChecksumAlgorithm::Sha1 => {
let sha1 = sha1::checksum(content).expect("sha1 computation failed");
checksum.checksum_sha1 = Some(sha1_to_base64(&sha1));
}
ChecksumAlgorithm::Sha256 => {
let sha256 = sha256::checksum(content).expect("sha256 computation failed");
checksum.checksum_sha256 = Some(sha256_to_base64(&sha256));
}
algorithm => unimplemented!("unknown checksum algorithm: {:?}", algorithm),
};
}
checksum
}

/// Validate data against the [UploadChecksum] and return the [Checksum] to be stored.
fn validate_checksum(
contents: &[u8],
upload_checksum: Option<&UploadChecksum>,
) -> ObjectClientResult<Checksum, PutObjectError, MockClientError> {
let algorithm = upload_checksum.map(|c| c.checksum_algorithm());
let content_checksum = compute_checksum(contents, algorithm.as_slice());
let provided_checksum = upload_checksum.cloned().into();
if provided_checksum != content_checksum {
return Err(ObjectClientError::ServiceError(PutObjectError::BadChecksum));
}
Ok(provided_checksum)
}

#[derive(Debug)]
pub struct MockGetObjectRequest {
object: MockObject,
Expand Down Expand Up @@ -799,32 +868,7 @@ impl ObjectClient for MockClient {
return Err(ObjectClientError::ServiceError(PutObjectError::NoSuchBucket));
}

let content_crc32 = crc32c::checksum(contents.as_ref());
let mut object: MockObject = contents.into();
object.set_storage_class(params.storage_class.clone());
object.set_object_metadata(params.object_metadata.clone());
if let Some(upload_checksum) = &params.checksum {
let mut checksum = Checksum::empty();
match upload_checksum {
UploadChecksum::Crc32c(crc32c) => {
if crc32c != &content_crc32 {
return Err(ObjectClientError::ClientError(MockClientError(
"crc32c specified did not match data content".into(),
)));
}
checksum.checksum_crc32c = Some(crc32c_to_base64(crc32c));
}
}
object.set_checksum(checksum);
}

let etag = object.etag.clone();
add_object(&self.objects, key, object);
Ok(PutObjectResult {
etag,
sse_type: None,
sse_kms_key_id: None,
})
self.mock_put_object(key, params, contents)
}

async fn get_object_attributes(
Expand All @@ -847,7 +891,7 @@ impl ObjectClient for MockClient {
let mut result = GetObjectAttributesResult::default();
for attribute in object_attributes.iter() {
match attribute {
ObjectAttribute::ETag => result.etag = Some("TODO".to_owned()),
ObjectAttribute::ETag => result.etag = Some(object.etag.as_str().to_owned()),
ObjectAttribute::Checksum => result.checksum = Some(object.checksum.clone()),
ObjectAttribute::ObjectParts => {
let parts = match &object.parts {
Expand Down Expand Up @@ -1288,6 +1332,7 @@ mod tests {
let next = get_request.next().await.expect("result should not be empty");
assert_client_error!(next, "empty read window");
}

#[tokio::test]
async fn test_copy_object() {
let bucket = "test_bucket";
Expand Down
28 changes: 27 additions & 1 deletion mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::collections::HashMap;
use thiserror::Error;
use time::OffsetDateTime;

use crate::checksums;
use crate::checksums::{self, crc32_to_base64, crc32c_to_base64, sha1_to_base64, sha256_to_base64};
use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};

mod etag;
Expand Down Expand Up @@ -548,13 +548,19 @@ impl PutObjectSingleParams {
#[non_exhaustive]
pub enum UploadChecksum {
Crc32c(checksums::Crc32c),
Crc32(checksums::Crc32),
Sha1(checksums::Sha1),
Sha256(checksums::Sha256),
}

impl UploadChecksum {
/// The checksum algorithm used to compute this checksum.
pub fn checksum_algorithm(&self) -> ChecksumAlgorithm {
match self {
UploadChecksum::Crc32c(_) => ChecksumAlgorithm::Crc32c,
UploadChecksum::Crc32(_) => ChecksumAlgorithm::Crc32,
UploadChecksum::Sha1(_) => ChecksumAlgorithm::Sha1,
UploadChecksum::Sha256(_) => ChecksumAlgorithm::Sha256,
}
}
}
Expand Down Expand Up @@ -648,6 +654,12 @@ pub struct PutObjectResult {
pub enum PutObjectError {
#[error("The bucket does not exist")]
NoSuchBucket,

#[error("The provided checksum does not match the data")]
BadChecksum,

#[error("The server does not support the functionality required to fulfill the request")]
NotImplemented,
}

/// Restoration status for S3 objects in flexible retrieval storage classes.
Expand Down Expand Up @@ -797,6 +809,20 @@ impl Checksum {
}
}

impl From<Option<UploadChecksum>> for Checksum {
fn from(value: Option<UploadChecksum>) -> Self {
let mut checksum = Checksum::empty();
match value.as_ref() {
Some(UploadChecksum::Crc32c(crc32c)) => checksum.checksum_crc32c = Some(crc32c_to_base64(crc32c)),
Some(UploadChecksum::Crc32(crc32)) => checksum.checksum_crc32 = Some(crc32_to_base64(crc32)),
Some(UploadChecksum::Sha1(sha1)) => checksum.checksum_sha1 = Some(sha1_to_base64(sha1)),
Some(UploadChecksum::Sha256(sha256)) => checksum.checksum_sha256 = Some(sha256_to_base64(sha256)),
None => {}
};
checksum
}
}

/// Metadata about object parts from GetObjectAttributes API.
///
/// See [GetObjectAttributesParts](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectAttributesParts.html)
Expand Down
5 changes: 4 additions & 1 deletion mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use pin_project::{pin_project, pinned_drop};
use thiserror::Error;
use tracing::{debug, error, trace, Span};

use crate::checksums::crc32c_to_base64;
use crate::checksums::{crc32_to_base64, crc32c_to_base64, sha1_to_base64, sha256_to_base64};
use crate::endpoint_config::EndpointError;
use crate::endpoint_config::{self, EndpointConfig};
use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
Expand Down Expand Up @@ -939,6 +939,9 @@ impl<'a> S3Message<'a> {
) -> Result<(), mountpoint_s3_crt::common::error::Error> {
let header = match checksum {
UploadChecksum::Crc32c(crc32c) => Header::new("x-amz-checksum-crc32c", crc32c_to_base64(crc32c)),
UploadChecksum::Crc32(crc32) => Header::new("x-amz-checksum-crc32", crc32_to_base64(crc32)),
UploadChecksum::Sha1(sha1) => Header::new("x-amz-checksum-sha1", sha1_to_base64(sha1)),
UploadChecksum::Sha256(sha256) => Header::new("x-amz-checksum-sha256", sha256_to_base64(sha256)),
};
self.inner.set_header(&header)
}
Expand Down
32 changes: 28 additions & 4 deletions mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::ops::Deref as _;
use std::os::unix::ffi::OsStrExt as _;
use std::sync::{Arc, Mutex};
use std::time::Instant;

Expand All @@ -8,7 +10,7 @@ use async_trait::async_trait;
use futures::channel::oneshot::{self, Receiver};
use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError};
use mountpoint_s3_crt::io::stream::InputStream;
use mountpoint_s3_crt::s3::client::{ChecksumConfig, RequestType, UploadReview};
use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestResult, RequestType, UploadReview};
use tracing::error;

use super::{
Expand Down Expand Up @@ -136,7 +138,7 @@ impl S3CrtClient {
for (name, value) in &params.object_metadata {
message
.set_header(&Header::new(format!("x-amz-meta-{}", name), value))
.map_err(S3RequestError::construction_failure)?
.map_err(S3RequestError::construction_failure)?;
}
for (name, value) in &params.custom_headers {
message
Expand All @@ -150,8 +152,13 @@ impl S3CrtClient {
message.set_body_stream(Some(body_input_stream));

let options = S3CrtClientInner::new_meta_request_options(message, S3Operation::PutObjectSingle);
self.inner
.make_simple_http_request_from_options(options, span, |_| {}, |_| None, on_headers)?
self.inner.make_simple_http_request_from_options(
options,
span,
|_| {},
parse_put_object_single_error,
on_headers,
)?
};

body.await?;
Expand Down Expand Up @@ -269,6 +276,23 @@ fn get_etag(response_headers: &Headers) -> Result<ETag, HeadersError> {
Ok(response_headers.get_as_string(ETAG_HEADER_NAME)?.into())
}

fn parse_put_object_single_error(result: &MetaRequestResult) -> Option<PutObjectError> {
match result.response_status {
400 => {
let body = result.error_response_body.as_ref()?;
let root = xmltree::Element::parse(body.as_bytes()).ok()?;
let error_code = root.get_child("Code")?;
let error_str = error_code.get_text()?;
match error_str.deref() {
"BadDigest" => Some(PutObjectError::BadChecksum),
_ => None,
}
}
501 => Some(PutObjectError::NotImplemented),
_ => None,
}
}

fn extract_result(response_headers: Headers) -> Result<PutObjectResult, S3RequestError> {
fn extract_result_headers_err(response_headers: Headers) -> Result<PutObjectResult, HeadersError> {
Ok(PutObjectResult {
Expand Down
Loading

0 comments on commit 458ffdc

Please sign in to comment.