From 9d48a7207dc0feec9d2176e6e86f40350b0ddf84 Mon Sep 17 00:00:00 2001 From: Simon Beal <5381483+muddyfish@users.noreply.github.com> Date: Thu, 7 Nov 2024 15:28:42 +0000 Subject: [PATCH] Add GetObject support for object metadata (#1065) ## Description of change Adds support for fetching user defined object metadata in GetObject calls. Relevant issues: N/A ## Does this change impact existing behavior? No ## Does this change need a changelog entry in any of the crates? Yes, for mountpoint-s3-client. --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/). --------- Signed-off-by: Simon Beal --- Cargo.lock | 7 + mountpoint-s3-client/Cargo.toml | 1 + mountpoint-s3-client/src/failure_client.rs | 13 +- mountpoint-s3-client/src/mock_client.rs | 36 ++++- .../src/mock_client/throughput_client.rs | 7 +- mountpoint-s3-client/src/object_client.rs | 19 ++- .../src/s3_crt_client/get_object.rs | 62 ++++++++- mountpoint-s3-client/tests/get_object.rs | 127 ++++++++++++++++++ 8 files changed, 253 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 175549693..5a44e84a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -368,6 +368,12 @@ dependencies = [ "syn 2.0.85", ] +[[package]] +name = "async_cell" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "834eee9ce518130a3b4d5af09ecc43e9d6b57ee76613f227a1ddd6b77c7a62bc" + [[package]] name = "atomic-waker" version = "1.1.2" @@ -2421,6 +2427,7 @@ dependencies = [ "async-io", "async-lock", "async-trait", + "async_cell", "auto_impl", "aws-config", "aws-credential-types", diff --git a/mountpoint-s3-client/Cargo.toml b/mountpoint-s3-client/Cargo.toml index 7a038c4dd..37ac854ad 100644 --- a/mountpoint-s3-client/Cargo.toml +++ b/mountpoint-s3-client/Cargo.toml @@ -11,6 +11,7 @@ description = "High-performance Amazon S3 client for Mountpoint for Amazon S3." mountpoint-s3-crt = { path = "../mountpoint-s3-crt", version = "0.10.0" } mountpoint-s3-crt-sys = { path = "../mountpoint-s3-crt-sys", version = "0.10.0" } +async_cell = "0.2.2" async-trait = "0.1.83" auto_impl = "1.2.0" base64ct = { version = "1.6.0", features = ["std"] } diff --git a/mountpoint-s3-client/src/failure_client.rs b/mountpoint-s3-client/src/failure_client.rs index 62dc20069..37f26ca86 100644 --- a/mountpoint-s3-client/src/failure_client.rs +++ b/mountpoint-s3-client/src/failure_client.rs @@ -18,8 +18,8 @@ use crate::object_client::{ CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectParams, HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, - ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, - PutObjectSingleParams, UploadReview, + ObjectClientError, ObjectClientResult, ObjectMetadata, PutObjectError, PutObjectParams, PutObjectRequest, + PutObjectResult, PutObjectSingleParams, UploadReview, }; // Wrapper for injecting failures into a get stream or a put request @@ -222,9 +222,16 @@ pub struct FailureGetRequest { request: Client::GetObjectRequest, } -impl GetObjectRequest for FailureGetRequest { +#[cfg_attr(not(docsrs), async_trait)] +impl GetObjectRequest + for FailureGetRequest +{ type ClientError = Client::ClientError; + async fn get_object_metadata(&self) -> ObjectClientResult { + self.request.get_object_metadata().await + } + fn increment_read_window(self: Pin<&mut Self>, len: usize) { let this = self.project(); this.request.increment_read_window(len); diff --git a/mountpoint-s3-client/src/mock_client.rs b/mountpoint-s3-client/src/mock_client.rs index 485e107be..faa8eebc4 100644 --- a/mountpoint-s3-client/src/mock_client.rs +++ b/mountpoint-s3-client/src/mock_client.rs @@ -30,8 +30,8 @@ use crate::object_client::{ DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesParts, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectParams, HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError, ObjectClientResult, - ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams, - PutObjectTrailingChecksums, RestoreStatus, UploadChecksum, UploadReview, UploadReviewPart, + ObjectInfo, ObjectMetadata, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, + PutObjectSingleParams, PutObjectTrailingChecksums, RestoreStatus, UploadChecksum, UploadReview, UploadReviewPart, }; mod leaky_bucket; @@ -530,9 +530,14 @@ impl MockGetObjectRequest { } } +#[cfg_attr(not(docsrs), async_trait)] impl GetObjectRequest for MockGetObjectRequest { type ClientError = MockClientError; + async fn get_object_metadata(&self) -> ObjectClientResult { + Ok(self.object.object_metadata.clone()) + } + fn increment_read_window(mut self: Pin<&mut Self>, len: usize) { self.read_window_end_offset += len as u64; } @@ -1061,7 +1066,12 @@ mod tests { }; } - async fn test_get_object(key: &str, size: usize, range: Option>) { + async fn test_get_object( + key: &str, + size: usize, + range: Option>, + object_metadata: HashMap, + ) { let mut rng = ChaChaRng::seed_from_u64(0x12345678); let client = MockClient::new(MockClientConfig { @@ -1073,7 +1083,10 @@ mod tests { let mut body = vec![0u8; size]; rng.fill_bytes(&mut body); - client.add_object(key, MockObject::from_bytes(&body, ETag::for_tests())); + + let mut object = MockObject::from_bytes(&body, ETag::for_tests()); + object.set_object_metadata(object_metadata.clone()); + client.add_object(key, object); let mut get_request = client .get_object("test_bucket", key, range.clone(), None) @@ -1091,13 +1104,22 @@ mod tests { let expected_range = range.unwrap_or(0..size as u64); let expected_range = expected_range.start as usize..expected_range.end as usize; assert_eq!(&accum[..], &body[expected_range], "body does not match"); + + assert_eq!(get_request.get_object_metadata().await, Ok(object_metadata)); } #[tokio::test] async fn get_object() { - test_get_object("key1", 2000, None).await; - test_get_object("key1", 9000, Some(50..2000)).await; - test_get_object("key1", 10, Some(0..10)).await; + test_get_object("key1", 2000, None, Default::default()).await; + test_get_object("key1", 9000, Some(50..2000), Default::default()).await; + test_get_object("key1", 10, Some(0..10), Default::default()).await; + test_get_object( + "key1", + 10, + None, + HashMap::from([("foo".to_string(), "bar".to_string())]), + ) + .await; } async fn test_get_object_backpressure( diff --git a/mountpoint-s3-client/src/mock_client/throughput_client.rs b/mountpoint-s3-client/src/mock_client/throughput_client.rs index 39958e768..5d6b3ba4d 100644 --- a/mountpoint-s3-client/src/mock_client/throughput_client.rs +++ b/mountpoint-s3-client/src/mock_client/throughput_client.rs @@ -17,7 +17,7 @@ use crate::object_client::{ CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectParams, HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, - ObjectClientResult, PutObjectError, PutObjectParams, PutObjectResult, PutObjectSingleParams, + ObjectClientResult, ObjectMetadata, PutObjectError, PutObjectParams, PutObjectResult, PutObjectSingleParams, }; /// A [MockClient] that rate limits overall download throughput to simulate a target network @@ -66,9 +66,14 @@ pub struct ThroughputGetObjectRequest { rate_limiter: LeakyBucket, } +#[cfg_attr(not(docsrs), async_trait)] impl GetObjectRequest for ThroughputGetObjectRequest { type ClientError = MockClientError; + async fn get_object_metadata(&self) -> ObjectClientResult { + Ok(self.request.object.object_metadata.clone()) + } + fn increment_read_window(self: Pin<&mut Self>, len: usize) { let this = self.project(); this.request.increment_read_window(len); diff --git a/mountpoint-s3-client/src/object_client.rs b/mountpoint-s3-client/src/object_client.rs index 588968eb0..69f8d3276 100644 --- a/mountpoint-s3-client/src/object_client.rs +++ b/mountpoint-s3-client/src/object_client.rs @@ -143,7 +143,7 @@ pub trait ObjectClient { /// /// [`ServiceError`]: ObjectClientError::ServiceError /// [`ClientError`]: ObjectClientError::ClientError -#[derive(Debug, Error)] +#[derive(Debug, Error, PartialEq)] pub enum ObjectClientError { /// An error returned by the service itself #[error("Service error")] @@ -357,6 +357,8 @@ pub enum GetObjectAttributesError { NoSuchKey, } +pub type ObjectMetadata = HashMap; + /// Parameters to a [`put_object`](ObjectClient::put_object) request #[derive(Debug, Default, Clone)] #[non_exhaustive] @@ -373,7 +375,7 @@ pub struct PutObjectParams { /// Custom headers to add to the request pub custom_headers: Vec<(String, String)>, /// User-defined object metadata - pub object_metadata: HashMap, + pub object_metadata: ObjectMetadata, } impl PutObjectParams { @@ -413,7 +415,7 @@ impl PutObjectParams { } /// Set user defined object metadata. - pub fn object_metadata(mut self, value: HashMap) -> Self { + pub fn object_metadata(mut self, value: ObjectMetadata) -> Self { self.object_metadata = value; self } @@ -456,7 +458,7 @@ pub struct PutObjectSingleParams { /// Custom headers to add to the request pub custom_headers: Vec<(String, String)>, /// User-defined object metadata - pub object_metadata: HashMap, + pub object_metadata: ObjectMetadata, } impl PutObjectSingleParams { @@ -496,7 +498,7 @@ impl PutObjectSingleParams { } /// Set user defined object metadata. - pub fn object_metadata(mut self, value: HashMap) -> Self { + pub fn object_metadata(mut self, value: ObjectMetadata) -> Self { self.object_metadata = value; self } @@ -525,10 +527,15 @@ impl UploadChecksum { /// object. #[cfg_attr(not(docsrs), async_trait)] pub trait GetObjectRequest: - Stream> + Send + Stream> + Send + Sync { type ClientError: std::error::Error + Send + Sync + 'static; + /// Get the object's user defined metadata. + /// If the metadata has already been read, return immediately. Otherwise, resolve the future + /// when they're read. + async fn get_object_metadata(&self) -> ObjectClientResult; + /// Increment the flow-control window, so that response data continues downloading. /// /// If the client was created with `enable_read_backpressure` set true, diff --git a/mountpoint-s3-client/src/s3_crt_client/get_object.rs b/mountpoint-s3-client/src/s3_crt_client/get_object.rs index 370de2744..baf9566e7 100644 --- a/mountpoint-s3-client/src/s3_crt_client/get_object.rs +++ b/mountpoint-s3-client/src/s3_crt_client/get_object.rs @@ -1,8 +1,11 @@ +use async_cell::sync::AsyncCell; +use async_trait::async_trait; use std::future::Future; use std::ops::Deref; use std::ops::Range; use std::os::unix::prelude::OsStrExt; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use futures::channel::mpsc::UnboundedReceiver; @@ -11,12 +14,20 @@ use mountpoint_s3_crt::common::error::Error; use mountpoint_s3_crt::http::request_response::Header; use mountpoint_s3_crt::s3::client::MetaRequestResult; use pin_project::pin_project; +use thiserror::Error; -use crate::object_client::{ETag, GetBodyPart, GetObjectError, ObjectClientError, ObjectClientResult}; +use crate::object_client::{ETag, GetBodyPart, GetObjectError, ObjectClientError, ObjectClientResult, ObjectMetadata}; use crate::s3_crt_client::{ GetObjectRequest, S3CrtClient, S3CrtClientInner, S3HttpRequest, S3Operation, S3RequestError, }; +/// Failures to return object metadata +#[derive(Clone, Error, Debug)] +pub enum ObjectMetadataError { + #[error("error occurred fetching object metadata")] + ObjectMetadataError, +} + impl S3CrtClient { /// Create and begin a new GetObject request. The returned [GetObjectRequest] is a [Stream] of /// body parts of the object, which will be delivered in order. @@ -67,15 +78,44 @@ impl S3CrtClient { let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::GetObject); options.part_size(self.inner.read_part_size as u64); + + let object_metadata = AsyncCell::shared(); + + let object_metadata_setter_on_headers = object_metadata.clone(); + let object_metadata_setter_on_finish = object_metadata.clone(); + let request = self.inner.make_meta_request_from_options( options, span, |_| (), - |_, _| (), + move |headers, status| { + // Headers can be returned multiple times, but the object metadata doesn't change. + // Explicitly ignore the case where we've already set object metadata. + + // Only set metadata if we have a 2xx status code. If we only get other status + // codes, then on_finish cancels. + if (200..300).contains(&status) { + // This isn't to do with safety, only minor performance gains. + if !object_metadata_setter_on_headers.is_set() { + let object_metadata = headers + .iter() + .filter_map(|(key, value)| { + let metadata_header = key.to_str()?.strip_prefix("x-amz-meta-")?; + let value = value.to_str()?; + Some((metadata_header.to_string(), value.to_string())) + }) + .collect(); + // Don't overwrite if already set. + object_metadata_setter_on_headers.or_set(Ok(object_metadata)); + } + } + }, move |offset, data| { let _ = sender.unbounded_send(Ok((offset, data.into()))); }, move |result| { + // FIXME - Ideally we'd include a reason why we failed here. + object_metadata_setter_on_finish.or_set(Err(ObjectMetadataError::ObjectMetadataError)); if result.is_err() { Err(parse_get_object_error(result).map(ObjectClientError::ServiceError)) } else { @@ -89,6 +129,8 @@ impl S3CrtClient { finish_receiver: receiver, finished: false, enable_backpressure: self.inner.enable_backpressure, + object_metadata, + initial_read_window_empty: self.inner.initial_read_window_size == 0, next_offset, read_window_end_offset, }) @@ -109,6 +151,8 @@ pub struct S3GetObjectRequest { finish_receiver: UnboundedReceiver>, finished: bool, enable_backpressure: bool, + object_metadata: Arc>>, + initial_read_window_empty: bool, /// Next offset of the data to be polled from [poll_next] next_offset: u64, /// Upper bound of the current read window. When backpressure is enabled, [S3GetObjectRequest] @@ -116,9 +160,23 @@ pub struct S3GetObjectRequest { read_window_end_offset: u64, } +#[cfg_attr(not(docsrs), async_trait)] impl GetObjectRequest for S3GetObjectRequest { type ClientError = S3RequestError; + async fn get_object_metadata(&self) -> ObjectClientResult { + match self.object_metadata.try_get() { + Some(result) => result, + None => { + if self.enable_backpressure && self.initial_read_window_empty { + return Err(ObjectClientError::ClientError(S3RequestError::EmptyReadWindow)); + } + self.object_metadata.get().await + } + } + .map_err(|_| ObjectClientError::ClientError(S3RequestError::RequestCanceled)) + } + fn increment_read_window(mut self: Pin<&mut Self>, len: usize) { self.read_window_end_offset += len as u64; self.request.meta_request.increment_read_window(len as u64); diff --git a/mountpoint-s3-client/tests/get_object.rs b/mountpoint-s3-client/tests/get_object.rs index c63387094..2ec30007f 100644 --- a/mountpoint-s3-client/tests/get_object.rs +++ b/mountpoint-s3-client/tests/get_object.rs @@ -2,6 +2,7 @@ pub mod common; +use std::collections::HashMap; use std::ops::Range; use std::option::Option::None; use std::str::FromStr; @@ -338,3 +339,129 @@ async fn test_get_object_cancel(read: bool) { // were actually cancelled, but we can at least check that the drop doesn't panic/deadlock. drop(request); } + +#[test_case(1, HashMap::from([("foo".to_string(), "bar".to_string())]); "1-byte object with metadata")] +#[test_case(10, HashMap::from([("foo".to_string(), "bar".to_string())]); "small object with metadata")] +#[test_case(30000000, HashMap::from([("foo".to_string(), "bar".to_string())]); "large object with metadata")] +#[tokio::test] +async fn test_get_object_user_metadata(size: usize, metadata: HashMap) { + let sdk_client = get_test_sdk_client().await; + let (bucket, prefix) = get_test_bucket_and_prefix("test_get_object_user_metadata"); + + let key = format!("{prefix}/test"); + let body = vec![0x42; size]; + sdk_client + .put_object() + .bucket(&bucket) + .key(&key) + .set_metadata(Some(metadata.clone())) + .body(ByteStream::from(body.clone())) + .send() + .await + .unwrap(); + + let client: S3CrtClient = get_test_client(); + + let result = client + .get_object(&bucket, &key, None, None) + .await + .expect("get_object should succeed"); + let actual_metadata = result.get_object_metadata().await.expect("should return metadata"); + let actual_metadata_2 = result + .get_object_metadata() + .await + .expect("should return metadata multiple times"); + + pin_mut!(result); + let expected = &body; + check_get_result(result, None, expected).await; + assert_eq!(actual_metadata, metadata); + assert_eq!(actual_metadata_2, metadata); +} + +#[test_case(50, HashMap::from([("foo".to_string(), "bar".to_string())]); "50-byte object with metadata")] +#[tokio::test] +async fn test_get_object_user_metadata_with_zero_backpressure(size: usize, metadata: HashMap) { + let sdk_client = get_test_sdk_client().await; + let (bucket, prefix) = get_test_bucket_and_prefix("test_get_object_user_metadata_with_zero_backpressure"); + + let key = format!("{prefix}/test"); + let body = vec![0x42; size]; + sdk_client + .put_object() + .bucket(&bucket) + .key(&key) + .set_metadata(Some(metadata.clone())) + .body(ByteStream::from(body.clone())) + .send() + .await + .unwrap(); + + let client: S3CrtClient = get_test_backpressure_client(0, None); + + let result = client + .get_object(&bucket, &key, Some(1..5), None) + .await + .expect("get_object should succeed"); + result + .get_object_metadata() + .await + .expect_err("should not return metadata for empty read window"); +} + +#[tokio::test] +async fn test_get_object_metadata_404() { + let (bucket, prefix) = get_test_bucket_and_prefix("test_get_object_metadata_404"); + + let key = format!("{prefix}/test"); + + let client: S3CrtClient = get_test_client(); + + let result = client + .get_object(&bucket, &key, None, None) + .await + .expect("get_object should succeed"); + result + .get_object_metadata() + .await + .expect_err("should not return metadata"); +} + +#[test_case(1, HashMap::from([("foo".to_string(), "bar".to_string())]); "1-byte object with metadata")] +#[test_case(10, HashMap::from([("foo".to_string(), "bar".to_string())]); "small object with metadata")] +#[test_case(30000000, HashMap::from([("foo".to_string(), "bar".to_string())]); "large object with metadata")] +#[tokio::test] +async fn test_get_object_user_metadata_after_stream(size: usize, metadata: HashMap) { + let sdk_client = get_test_sdk_client().await; + let (bucket, prefix) = get_test_bucket_and_prefix("test_get_object_user_metadata"); + + let key = format!("{prefix}/test"); + let body = vec![0x42; size]; + sdk_client + .put_object() + .bucket(&bucket) + .key(&key) + .set_metadata(Some(metadata.clone())) + .body(ByteStream::from(body.clone())) + .send() + .await + .unwrap(); + + let client: S3CrtClient = get_test_client(); + + let result = client + .get_object(&bucket, &key, None, None) + .await + .expect("get_object should succeed"); + + pin_mut!(result); + while let Some(r) = result.next().await { + let _ = r.expect("get_object body part failed"); + } + let actual_metadata = result + .as_ref() + .get_object_metadata() + .await + .expect("should return metadata"); + assert_eq!(actual_metadata, metadata); +}