Skip to content

Commit

Permalink
Make reqwest an optional feature
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Feb 26, 2025
1 parent e916df1 commit 0f35094
Show file tree
Hide file tree
Showing 23 changed files with 137 additions and 93 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ jobs:
rustup default stable
- name: Run object_store tests
run: cargo test --features=aws,azure,gcp,http
run: cargo test --features=aws,azure,gcp,http,reqwest

- name: Run object_store tests (AWS native conditional put)
run: cargo test --features=aws
run: cargo test --features=aws,reqwest
env:
AWS_CONDITIONAL_PUT: etag
AWS_COPY_IF_NOT_EXISTS: multipart
Expand Down
6 changes: 4 additions & 2 deletions object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ walkdir = { version = "2", optional = true }
base64 = { version = "0.22", default-features = false, features = ["std"], optional = true }
form_urlencoded = { version = "1.2", optional = true }
http = { version = "1.2.0", optional = true }
http-body = { version = "1.0.1", optional = true}
http-body-util = { version = "0.1", optional = true }
httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true }
hyper = { version = "1.2", default-features = false, optional = true }
md-5 = { version = "0.10.6", default-features = false, optional = true }
quick-xml = { version = "0.37.0", features = ["serialize", "overlapped-lists"], optional = true }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"], optional = true }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2", "stream"], optional = true }
ring = { version = "0.17", default-features = false, features = ["std"], optional = true }
rustls-pemfile = { version = "2.0", default-features = false, features = ["std"], optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
Expand All @@ -66,7 +67,8 @@ nix = { version = "0.29.0", features = ["fs"] }

[features]
default = ["fs"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "dep:http", "http-body-util", "form_urlencoded", "serde_urlencoded"]
reqwest = ["dep:reqwest", "hyper"]
cloud = ["serde", "serde_json", "quick-xml", "chrono/serde", "base64", "rand", "ring", "dep:http", "http-body", "http-body-util", "form_urlencoded", "serde_urlencoded"]
azure = ["cloud", "httparse"]
fs = ["walkdir"]
gcp = ["cloud", "rustls-pemfile"]
Expand Down
11 changes: 6 additions & 5 deletions object_store/src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ use crate::aws::{
AmazonS3, AwsCredential, AwsCredentialProvider, Checksum, S3ConditionalPut, S3CopyIfNotExists,
STORE,
};
use crate::client::{HttpConnector, ReqwestConnector, TokenCredentialProvider};
use crate::client::{default_connector, HttpConnector, TokenCredentialProvider};
use crate::config::ConfigValue;
use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use http::{HeaderMap, HeaderValue};
use itertools::Itertools;
use md5::{Digest, Md5};
use reqwest::header::{HeaderMap, HeaderValue};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -897,9 +897,10 @@ impl AmazonS3Builder {
self.parse_url(&url)?;
}

let http = self
.http_connector
.unwrap_or_else(|| Arc::new(ReqwestConnector::default()));
let http = match self.http_connector {
Some(connector) => connector,
None => default_connector()?,
};

let bucket = self.bucket_name.ok_or(Error::MissingBucketName)?;
let region = self.region.unwrap_or_else(|| "us-east-1".to_string());
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/aws/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ struct CreateSessionOutput {
credentials: SessionCredentials,
}

#[cfg(test)]
#[cfg(all(test, feature = "reqwest"))]
mod tests {
use super::*;
use crate::client::mock_server::MockServer;
Expand Down
12 changes: 8 additions & 4 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::{Method, StatusCode};
use http::header::{IF_MATCH, IF_NONE_MATCH};
use http::{HeaderName, Method, StatusCode};
use std::{sync::Arc, time::Duration};
use url::Url;

Expand All @@ -58,12 +58,16 @@ mod client;
mod credential;
mod dynamo;
mod precondition;
mod resolve;

pub use builder::{AmazonS3Builder, AmazonS3ConfigKey};
pub use checksum::Checksum;
pub use dynamo::DynamoCommit;
pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};

#[cfg(feature = "reqwest")]
mod resolve;

#[cfg(feature = "reqwest")]
pub use resolve::resolve_bucket_region;

/// This struct is used to maintain the URI path encoding
Expand Down Expand Up @@ -115,7 +119,7 @@ impl Signer for AmazonS3 {
/// ```
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// # use object_store::{aws::AmazonS3Builder, path::Path, signer::Signer};
/// # use reqwest::Method;
/// # use http::Method;
/// # use std::time::Duration;
/// #
/// let region = "us-east-1";
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/aws/precondition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub enum S3CopyIfNotExists {
/// other than 412.
///
/// Encoded as `header-with-status:<HEADER_NAME>:<HEADER_VALUE>:<STATUS>` ignoring whitespace
HeaderWithStatus(String, String, reqwest::StatusCode),
HeaderWithStatus(String, String, http::StatusCode),
/// Native Amazon S3 supports copy if not exists through a multipart upload
/// where the upload copies an existing object and is completed only if the
/// new object does not already exist.
Expand Down
9 changes: 5 additions & 4 deletions object_store/src/azure/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::azure::credential::{
ImdsManagedIdentityProvider, WorkloadIdentityOAuthProvider,
};
use crate::azure::{AzureCredential, AzureCredentialProvider, MicrosoftAzure, STORE};
use crate::client::{HttpConnector, ReqwestConnector, TokenCredentialProvider};
use crate::client::{default_connector, HttpConnector, TokenCredentialProvider};
use crate::config::ConfigValue;
use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider};
use percent_encoding::percent_decode_str;
Expand Down Expand Up @@ -907,9 +907,10 @@ impl MicrosoftAzureBuilder {
Arc::new(StaticCredentialProvider::new(credential))
};

let http = self
.http_connector
.unwrap_or_else(|| Arc::new(ReqwestConnector::default()));
let http = match self.http_connector {
Some(connector) => connector,
None => default_connector()?,
};

let (is_emulator, storage_url, auth, account) = if self.use_emulator.get()? {
let account_name = self
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,7 @@ pub(crate) struct UserDelegationKey {
pub value: String,
}

#[cfg(test)]
#[cfg(all(test, feature = "reqwest"))]
mod tests {
use super::*;
use crate::StaticCredentialProvider;
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/azure/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ impl CredentialProvider for AzureCliCredential {
}
}

#[cfg(test)]
#[cfg(all(test, feature = "reqwest"))]
mod tests {
use futures::executor::block_on;
use http::{Response, StatusCode};
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
};
use async_trait::async_trait;
use futures::stream::{BoxStream, StreamExt, TryStreamExt};
use reqwest::Method;
use http::Method;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -170,7 +170,7 @@ impl Signer for MicrosoftAzure {
/// ```
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// # use object_store::{azure::MicrosoftAzureBuilder, path::Path, signer::Signer};
/// # use reqwest::Method;
/// # use http::Method;
/// # use std::time::Duration;
/// #
/// let azure = MicrosoftAzureBuilder::new()
Expand Down
3 changes: 2 additions & 1 deletion object_store/src/client/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use crate::{collect_bytes, PutPayload};
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use http_body::{Body, Frame, SizeHint};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use hyper::body::{Body, Frame, SizeHint};
use std::pin::Pin;
use std::task::{Context, Poll};

Expand All @@ -39,6 +39,7 @@ impl HttpRequestBody {
Self(Inner::Bytes(Bytes::new()))
}

#[cfg(feature = "reqwest")]
pub(crate) fn into_reqwest(self) -> reqwest::Body {
match self.0 {
Inner::Bytes(b) => b.into(),
Expand Down
83 changes: 53 additions & 30 deletions object_store/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

use crate::client::body::{HttpRequest, HttpResponse};
use crate::client::builder::{HttpRequestBuilder, RequestBuilderError};
use crate::client::HttpResponseBody;
use crate::ClientOptions;
use async_trait::async_trait;
use http::{Method, Uri};
use http_body_util::BodyExt;
use std::error::Error;
use std::sync::Arc;

Expand Down Expand Up @@ -83,7 +81,8 @@ impl HttpError {
}
}

pub(crate) fn reqwest(e: reqwest::Error) -> Self {
#[cfg(feature = "reqwest")]
pub(crate) fn reqwest(e: ::reqwest::Error) -> Self {
let mut kind = if e.is_timeout() {
HttpErrorKind::Timeout
} else if e.is_connect() {
Expand Down Expand Up @@ -199,39 +198,63 @@ impl HttpClient {
}
}

#[async_trait]
impl HttpService for reqwest::Client {
async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
let (parts, body) = req.into_parts();

let url = parts.uri.to_string().parse().unwrap();
let mut req = reqwest::Request::new(parts.method, url);
*req.headers_mut() = parts.headers;
*req.body_mut() = Some(body.into_reqwest());

let r = self.execute(req).await.map_err(HttpError::reqwest)?;
let res: http::Response<reqwest::Body> = r.into();
let (parts, body) = res.into_parts();

let body = HttpResponseBody::new(body.map_err(HttpError::reqwest));
Ok(HttpResponse::from_parts(parts, body))
}
}

/// A factory for [`HttpClient`]
pub trait HttpConnector: std::fmt::Debug + Send + Sync + 'static {
/// Create a new [`HttpClient`] with the provided [`ClientOptions`]
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient>;
}

/// [`HttpConnector`] using [`reqwest::Client`]
#[derive(Debug, Default)]
#[allow(missing_copy_implementations)]
pub struct ReqwestConnector {}
#[cfg(feature = "reqwest")]
pub(crate) fn default_connector() -> crate::Result<Arc<dyn HttpConnector>> {
Ok(Arc::new(ReqwestConnector::default()))
}

#[cfg(not(feature = "reqwest"))]
pub(crate) fn default_connector() -> crate::Result<Arc<dyn HttpConnector>> {
const MSG: &str = "No HTTP Client, either enable reqwest feature or override HttpConnector";

Err(crate::Error::NotSupported {
source: MSG.to_string().into(),
})
}

#[cfg(feature = "reqwest")]
mod reqwest {
use super::*;
use crate::client::HttpResponseBody;
use ::reqwest::{Client, Request};
use http_body_util::BodyExt;
#[async_trait]
impl HttpService for Client {
async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
let (parts, body) = req.into_parts();

let url = parts.uri.to_string().parse().unwrap();
let mut req = Request::new(parts.method, url);
*req.headers_mut() = parts.headers;
*req.body_mut() = Some(body.into_reqwest());

let r = self.execute(req).await.map_err(HttpError::reqwest)?;
let res: http::Response<_> = r.into();
let (parts, body) = res.into_parts();

let body = HttpResponseBody::new(body.map_err(HttpError::reqwest));
Ok(HttpResponse::from_parts(parts, body))
}
}

/// [`HttpConnector`] using [`reqwest::Client`]
#[derive(Debug, Default)]
#[allow(missing_copy_implementations)]
pub struct ReqwestConnector {}

impl HttpConnector for ReqwestConnector {
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
let client = options.client()?;
Ok(HttpClient::new(client))
impl HttpConnector for ReqwestConnector {
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
let client = options.client()?;
Ok(HttpClient::new(client))
}
}
}

#[cfg(feature = "reqwest")]
pub use reqwest::*;
5 changes: 2 additions & 3 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ use crate::{Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPay
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use http::header::{
CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_RANGE,
CONTENT_TYPE,
ToStrError, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
CONTENT_RANGE, CONTENT_TYPE,
};
use http::StatusCode;
use reqwest::header::ToStrError;

/// A client that can perform a get request
#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/client/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub(crate) enum Error {
MissingEtag,

#[error("Received header containing non-ASCII data")]
BadHeader { source: reqwest::header::ToStrError },
BadHeader { source: http::header::ToStrError },

#[error("Last-Modified Header missing from response")]
MissingLastModified,
Expand Down
Loading

0 comments on commit 0f35094

Please sign in to comment.