Skip to content

Commit

Permalink
Support configuring SSE-KMS in S3CrtClient (#693)
Browse files Browse the repository at this point in the history
* Support configuring SSE-KMS (#534)

Signed-off-by: Vladislav Volodkin <[email protected]>

* Fix some of the CI jobs

Signed-off-by: Vladislav Volodkin <[email protected]>

* Don't do headers check when request failed, fix test

Signed-off-by: Vladislav Volodkin <[email protected]>

* Fix formatting

Signed-off-by: Vladislav Volodkin <[email protected]>

* Hide sse settings behind a feature flag

Signed-off-by: Vladislav Volodkin <[email protected]>

* Add tests for error cases

Signed-off-by: Vladislav Volodkin <[email protected]>

* Make the headers check to panic on failure

Signed-off-by: Vladislav Volodkin <[email protected]>

* Rename the feature flag

Signed-off-by: Vladislav Volodkin <[email protected]>

* Do not run sse tests for express buckets

Signed-off-by: Vladislav Volodkin <[email protected]>

* Move out cli changes to a separate PR

Signed-off-by: Vladislav Volodkin <[email protected]>

* Add extraction methods to ServerSideEncryption enum, fix documentation and formatting

Signed-off-by: Vladislav Volodkin <[email protected]>

* Make check_response_headers to check specifically for SSE settings

Signed-off-by: Vladislav Volodkin <[email protected]>

* Decompose SSE settings provided for S3PutObjectRequest

Signed-off-by: Vladislav Volodkin <[email protected]>

* Remove SSE enum, replace test for check_headers with a unit test

Signed-off-by: Vladislav Volodkin <[email protected]>

* Refactor check_response_headers

Signed-off-by: Vladislav Volodkin <[email protected]>

* Refactor check_response_headers call

Signed-off-by: Vladislav Volodkin <[email protected]>

* Improve comments

Signed-off-by: Vladislav Volodkin <[email protected]>

---------

Signed-off-by: Vladislav Volodkin <[email protected]>
Co-authored-by: Vladislav Volodkin <[email protected]>
  • Loading branch information
vladem and Vladislav Volodkin authored Jan 22, 2024
1 parent f5de97e commit ae0f475
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 34 deletions.
17 changes: 17 additions & 0 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ pub struct PutObjectParams {
pub trailing_checksums: bool,
/// Storage class to be used when creating new S3 object
pub storage_class: Option<String>,
/// The server-side encryption algorithm to be used for this object in Amazon S3 (for example, AES256, aws:kms, aws:kms:dsse)
pub server_side_encryption: Option<String>,
/// If `server_side_encryption` has a valid value of aws:kms or aws:kms:dsse, this value may be used to specify AWS KMS key ID to be used
/// when creating new S3 object
pub ssekms_key_id: Option<String>,
}

impl PutObjectParams {
Expand All @@ -293,6 +298,18 @@ impl PutObjectParams {
self.storage_class = Some(value);
self
}

/// Set server-side encryption type.
pub fn server_side_encryption(mut self, value: Option<String>) -> Self {
self.server_side_encryption = value;
self
}

/// Set KMS key ID to be used for server-side encryption.
pub fn ssekms_key_id(mut self, value: Option<String>) -> Self {
self.ssekms_key_id = value;
self
}
}

/// Info for the caller to review before an upload completes.
Expand Down
5 changes: 3 additions & 2 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ impl S3CrtClientInner {
on_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
) -> Result<S3HttpRequest<Vec<u8>, E>, S3RequestError> {
let options = Self::new_meta_request_options(message, request_type);
self.make_simple_http_request_from_options(options, request_span, on_error)
self.make_simple_http_request_from_options(options, request_span, on_error, |_, _| ())
}

/// Make an HTTP request using this S3 client that returns the body on success or invokes the
Expand All @@ -580,6 +580,7 @@ impl S3CrtClientInner {
options: MetaRequestOptions,
request_span: Span,
on_error: impl FnOnce(&MetaRequestResult) -> Option<E> + Send + 'static,
on_headers: impl FnMut(&Headers, i32) + Send + 'static,
) -> Result<S3HttpRequest<Vec<u8>, E>, S3RequestError> {
// Accumulate the body of the response into this Vec<u8>
let body: Arc<Mutex<Vec<u8>>> = Default::default();
Expand All @@ -588,7 +589,7 @@ impl S3CrtClientInner {
self.make_meta_request_from_options(
options,
request_span,
|_, _| (),
on_headers,
move |offset, data| {
let mut body = body_clone.lock().unwrap();
assert_eq!(offset as usize, body.len());
Expand Down
122 changes: 117 additions & 5 deletions mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ use std::time::Instant;
use crate::object_client::{ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult};
use crate::s3_crt_client::{emit_throughput_metric, S3CrtClient, S3RequestError};
use async_trait::async_trait;
use mountpoint_s3_crt::http::request_response::Header;
use mountpoint_s3_crt::http::request_response::{Header, Headers};
use mountpoint_s3_crt::io::async_stream::{self, AsyncStreamWriter};
use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestType, UploadReview};
use tracing::error;

use super::{S3CrtClientInner, S3HttpRequest};

const SSE_TYPE_HEADER_NAME: &str = "x-amz-server-side-encryption";
const SSE_KEY_ID_HEADER_NAME: &str = "x-amz-server-side-encryption-aws-kms-key-id";

impl S3CrtClient {
pub(super) async fn put_object(
&self,
Expand Down Expand Up @@ -45,19 +48,39 @@ impl S3CrtClient {
.set_header(&Header::new("x-amz-storage-class", storage_class))
.map_err(S3RequestError::construction_failure)?;
}

if let Some(sse) = params.server_side_encryption.as_ref() {
message
.set_header(&Header::new(SSE_TYPE_HEADER_NAME, sse))
.map_err(S3RequestError::construction_failure)?;
}
if let Some(key_id) = params.ssekms_key_id.as_ref() {
message
.set_header(&Header::new(SSE_KEY_ID_HEADER_NAME, key_id))
.map_err(S3RequestError::construction_failure)?;
}
// Variable `response_headers` will be accessed from different threads: from CRT thread which executes `on_headers` callback
// and from our thread which executes `review_and_complete`. Callback `on_headers` is guaranteed to finish before this
// variable is accessed in `review_and_complete` (see `S3HttpRequest::poll` implementation).
let response_headers: Arc<Mutex<Option<Headers>>> = Default::default();
let response_headers_writer = response_headers.clone();
let on_headers = move |headers: &Headers, _: i32| {
*response_headers_writer.lock().unwrap() = Some(headers.clone());
};
let mut options = S3CrtClientInner::new_meta_request_options(message, MetaRequestType::PutObject);
options.on_upload_review(move |review| callback.invoke(review));
let body = self
.inner
.make_simple_http_request_from_options(options, span, |_| None)?;
.make_simple_http_request_from_options(options, span, |_| None, on_headers)?;

Ok(S3PutObjectRequest {
body,
writer,
review_callback,
start_time: Instant::now(),
total_bytes: 0,
response_headers,
server_side_encryption: params.server_side_encryption.clone(),
ssekms_key_id: params.ssekms_key_id.clone(),
})
}
}
Expand Down Expand Up @@ -106,6 +129,35 @@ pub struct S3PutObjectRequest {
review_callback: ReviewCallbackBox,
start_time: Instant,
total_bytes: u64,
/// Headers of the CompleteMultipartUpload response, available after the request was finished
response_headers: Arc<Mutex<Option<Headers>>>,
/// Server-side encryption type which is expected to be found in response_headers
server_side_encryption: Option<String>,
/// Server-side encryption KMS key ID which is expected to be found in response_headers
ssekms_key_id: Option<String>,
}

/// If non empty `server_side_encryption` or `ssekms_key_id` were used, this function checks headers
/// of the CompleteMultipartUpload response to contain the expected values
fn check_response_headers(response_headers: &Headers, expected_sse: Option<&str>, expected_key_id: Option<&str>) {
if let Some(sse_type) = expected_sse {
let actual_header = response_headers.get(SSE_TYPE_HEADER_NAME).ok();
let actual_value = actual_header.as_ref().and_then(|header| header.value().to_str());
assert_eq!(
actual_value,
Some(sse_type),
"SSE type provided in CompleteMultipartUpload response does not match the requested value",
);
}
if let Some(sse_key_id) = expected_key_id {
let actual_header = response_headers.get(SSE_KEY_ID_HEADER_NAME).ok();
let actual_value = actual_header.as_ref().and_then(|header| header.value().to_str());
assert_eq!(
actual_value,
Some(sse_key_id),
"SSE KMS key ID provided in CompleteMultipartUpload response does not match the requested value",
);
}
}

#[cfg_attr(not(docs_rs), async_trait)]
Expand All @@ -124,6 +176,9 @@ impl PutObjectRequest for S3PutObjectRequest {
self.review_and_complete(|_| true).await
}

/// Note: this function will panic if an SSE was requested to be applied to the object
/// and we failed to check that this actually happened. This may be caused by a bug in
/// CRT code or HTTP headers being corrupted in transit between us and the S3 server.
async fn review_and_complete(
mut self,
review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
Expand All @@ -137,11 +192,68 @@ impl PutObjectRequest for S3PutObjectRequest {
self.body
};

let result = body.await;
let _ = body.await?;

let elapsed = self.start_time.elapsed();
emit_throughput_metric(self.total_bytes, elapsed, "put_object");

result.map(|_| PutObjectResult {})
check_response_headers(
self.response_headers
.lock()
.expect("must be able to acquire headers lock")
.as_ref()
.expect("PUT response headers must be available at this point"),
self.server_side_encryption.as_deref(),
self.ssekms_key_id.as_deref(),
);
Ok(PutObjectResult {})
}
}

#[cfg(test)]
mod tests {
use super::{check_response_headers, Header, Headers, SSE_KEY_ID_HEADER_NAME, SSE_TYPE_HEADER_NAME};
use mountpoint_s3_crt::common::allocator::Allocator;
use test_case::test_case;

#[test_case(Some("sse:kms"), Some("some_key_alias"))]
#[test_case(Some("sse:kms:dsse"), Some("some_key_alias"))]
#[test_case(Some("sse:kms"), None)]
#[test_case(None, None)]
fn test_check_headers_ok(sse_type: Option<&str>, sse_kms_key_id: Option<&str>) {
let mut headers = Headers::new(&Allocator::default()).unwrap();
if let Some(sse_type) = sse_type {
let header = Header::new(SSE_TYPE_HEADER_NAME, sse_type);
headers.add_header(&header).unwrap();
}
if let Some(sse_kms_key_id) = sse_kms_key_id {
let header = Header::new(SSE_KEY_ID_HEADER_NAME, sse_kms_key_id);
headers.add_header(&header).unwrap();
}
check_response_headers(&headers, sse_type, sse_kms_key_id);
}

#[test]
#[should_panic(
expected = "SSE type provided in CompleteMultipartUpload response does not match the requested value"
)]
fn test_check_headers_bad_sse_type() {
let mut headers = Headers::new(&Allocator::default()).unwrap();
let header = Header::new(SSE_TYPE_HEADER_NAME, "wrong");
headers.add_header(&header).unwrap();
let header = Header::new(SSE_KEY_ID_HEADER_NAME, "some_key_alias");
headers.add_header(&header).unwrap();
check_response_headers(&headers, Some("sse:kms"), Some("some_key_alias"));
}

#[test]
#[should_panic(
expected = "SSE KMS key ID provided in CompleteMultipartUpload response does not match the requested value"
)]
fn test_check_headers_bad_sse_key() {
let mut headers = Headers::new(&Allocator::default()).unwrap();
let header = Header::new(SSE_TYPE_HEADER_NAME, "sse:kms");
headers.add_header(&header).unwrap();
check_response_headers(&headers, Some("sse:kms"), Some("some_key_alias"));
}
}
11 changes: 9 additions & 2 deletions mountpoint-s3-client/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ pub fn get_test_bucket() -> String {
}
}

pub fn get_test_kms_key_id() -> String {
std::env::var("KMS_TEST_KEY_ID").expect("Set KMS_TEST_KEY_ID to run integration tests")
}

pub fn get_test_client() -> S3CrtClient {
let endpoint_config = EndpointConfig::new(&get_test_region());
S3CrtClient::new(S3ClientConfig::new().endpoint_config(endpoint_config)).expect("could not create test client")
Expand Down Expand Up @@ -190,6 +194,7 @@ macro_rules! object_client_test {
mod $test_fn_identifier {
use super::$test_fn_identifier;
use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig};
use mountpoint_s3_client::types::PutObjectParams;
use $crate::{get_test_bucket_and_prefix, get_test_client};

#[tokio::test]
Expand All @@ -202,7 +207,8 @@ macro_rules! object_client_test {
unordered_list_seed: None,
});

$test_fn_identifier(&client, &bucket, &prefix).await;
let key = format!("{prefix}hello");
$test_fn_identifier(&client, &bucket, &key, PutObjectParams::new()).await;
}

#[tokio::test]
Expand All @@ -211,7 +217,8 @@ macro_rules! object_client_test {

let client = get_test_client();

$test_fn_identifier(&client, &bucket, &prefix).await;
let key = format!("{prefix}hello");
$test_fn_identifier(&client, &bucket, &key, PutObjectParams::new()).await;
}
}
};
Expand Down
Loading

0 comments on commit ae0f475

Please sign in to comment.