From b90a261c2ba296a3263ec0ffb0df495365bb3b6c Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 27 Feb 2025 18:47:09 +0100 Subject: [PATCH] feat: add `Extensions` to object store `PutMultipartOpts` (#7214) --- object_store/src/aws/client.rs | 11 +++++++++-- object_store/src/azure/client.rs | 11 +++++++++-- object_store/src/buffered.rs | 17 +++++++++++++++++ object_store/src/gcp/client.rs | 10 +++++++++- object_store/src/lib.rs | 27 ++++++++++++++++++++++++++- 5 files changed, 70 insertions(+), 6 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 6cf5540000b7..fb2a033c3b46 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -633,6 +633,12 @@ impl S3Client { location: &Path, opts: PutMultipartOpts, ) -> Result { + let PutMultipartOpts { + tags, + attributes, + extensions, + } = opts; + let mut request = self.request(Method::POST, location); if let Some(algorithm) = self.config.checksum { match algorithm { @@ -644,8 +650,9 @@ impl S3Client { let response = request .query(&[("uploads", "")]) .with_encryption_headers() - .with_attributes(opts.attributes) - .with_tags(opts.tags) + .with_attributes(attributes) + .with_tags(tags) + .with_extensions(extensions) .idempotent(true) .send() .await? diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index c4d026bcb8a7..dbeae634608e 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -599,6 +599,12 @@ impl AzureClient { parts: Vec, opts: PutMultipartOpts, ) -> Result { + let PutMultipartOpts { + tags, + attributes, + extensions, + } = opts; + let blocks = parts .into_iter() .map(|part| BlockId::from(part.content_id)) @@ -607,8 +613,9 @@ impl AzureClient { let payload = BlockList { blocks }.to_xml().into(); let response = self .put_request(path, payload) - .with_attributes(opts.attributes) - .with_tags(opts.tags) + .with_attributes(attributes) + .with_tags(tags) + .with_extensions(extensions) .query(&[("comp", "blocklist")]) .idempotent(true) .send() diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index fcd7e064e7c1..a767cb65c91f 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -222,6 +222,7 @@ pub struct BufWriter { max_concurrency: usize, attributes: Option, tags: Option, + extensions: Option<::http::Extensions>, state: BufWriterState, store: Arc, } @@ -259,6 +260,7 @@ impl BufWriter { max_concurrency: 8, attributes: None, tags: None, + extensions: None, state: BufWriterState::Buffer(path, PutPayloadMut::new()), } } @@ -289,6 +291,19 @@ impl BufWriter { } } + /// Set the extensions of the uploaded object + /// + /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations + /// that need to pass context-specific information (like tracing spans) via trait methods. + /// + /// These extensions are ignored entirely by backends offered through this crate. + pub fn with_extensions(self, extensions: ::http::Extensions) -> Self { + Self { + extensions: Some(extensions), + ..self + } + } + /// Write data to the writer in [`Bytes`]. /// /// Unlike [`AsyncWrite::poll_write`], `put` can write data without extra copying. @@ -325,6 +340,7 @@ impl BufWriter { let opts = PutMultipartOpts { attributes: self.attributes.take().unwrap_or_default(), tags: self.tags.take().unwrap_or_default(), + extensions: self.extensions.take().unwrap_or_default(), }; let upload = self.store.put_multipart_opts(&path, opts).await?; let mut chunked = @@ -384,6 +400,7 @@ impl AsyncWrite for BufWriter { let opts = PutMultipartOpts { attributes: self.attributes.take().unwrap_or_default(), tags: self.tags.take().unwrap_or_default(), + extensions: self.extensions.take().unwrap_or_default(), }; let store = Arc::clone(&self.store); self.state = BufWriterState::Prepare(Box::pin(async move { diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index e514624f8f71..1cc72964f82c 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -453,9 +453,17 @@ impl GoogleCloudStorageClient { path: &Path, opts: PutMultipartOpts, ) -> Result { + let PutMultipartOpts { + // not supported by GCP + tags: _, + attributes, + extensions, + } = opts; + let response = self .request(Method::POST, path) - .with_attributes(opts.attributes) + .with_attributes(attributes) + .with_extensions(extensions) .header(&CONTENT_LENGTH, "0") .query(&[("uploads", "")]) .send() diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 8f05fb391194..5db7e01d7d89 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1223,7 +1223,7 @@ impl From for PutOptions { } /// Options for [`ObjectStore::put_multipart_opts`] -#[derive(Debug, Clone, PartialEq, Eq, Default)] +#[derive(Debug, Clone, Default)] pub struct PutMultipartOpts { /// Provide a [`TagSet`] for this object /// @@ -1233,8 +1233,33 @@ pub struct PutMultipartOpts { /// /// Implementations that don't support an attribute should return an error pub attributes: Attributes, + /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations + /// that need to pass context-specific information (like tracing spans) via trait methods. + /// + /// These extensions are ignored entirely by backends offered through this crate. + /// + /// They are also eclused from [`PartialEq`] and [`Eq`]. + pub extensions: ::http::Extensions, +} + +impl PartialEq for PutMultipartOpts { + fn eq(&self, other: &Self) -> bool { + let Self { + tags, + attributes, + extensions: _, + } = self; + let Self { + tags: other_tags, + attributes: other_attributes, + extensions: _, + } = other; + (tags == other_tags) && (attributes == other_attributes) + } } +impl Eq for PutMultipartOpts {} + impl From for PutMultipartOpts { fn from(tags: TagSet) -> Self { Self {