Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make reqwest an optional feature #7203

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ jobs:
rustup default stable
- name: Run object_store tests
run: cargo test --features=aws,azure,gcp,http
run: cargo test --features=aws,azure,gcp,http,reqwest

- name: Run object_store tests (AWS native conditional put)
run: cargo test --features=aws
run: cargo test --features=aws,reqwest
env:
AWS_CONDITIONAL_PUT: etag
AWS_COPY_IF_NOT_EXISTS: multipart
Expand Down
6 changes: 4 additions & 2 deletions object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ walkdir = { version = "2", optional = true }
base64 = { version = "0.22", default-features = false, features = ["std"], optional = true }
form_urlencoded = { version = "1.2", optional = true }
http = { version = "1.2.0", optional = true }
http-body = { version = "1.0.1", optional = true}
http-body-util = { version = "0.1", optional = true }
httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true }
hyper = { version = "1.2", default-features = false, optional = true }
md-5 = { version = "0.10.6", default-features = false, optional = true }
quick-xml = { version = "0.37.0", features = ["serialize", "overlapped-lists"], optional = true }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"], optional = true }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2", "stream"], optional = true }
ring = { version = "0.17", default-features = false, features = ["std"], optional = true }
rustls-pemfile = { version = "2.0", default-features = false, features = ["std"], optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
Expand All @@ -66,7 +67,8 @@ nix = { version = "0.29.0", features = ["fs"] }

[features]
default = ["fs"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "dep:http", "http-body-util", "form_urlencoded", "serde_urlencoded"]
reqwest = ["dep:reqwest", "hyper"]
cloud = ["serde", "serde_json", "quick-xml", "chrono/serde", "base64", "rand", "ring", "dep:http", "http-body", "http-body-util", "form_urlencoded", "serde_urlencoded"]
azure = ["cloud", "httparse"]
fs = ["walkdir"]
gcp = ["cloud", "rustls-pemfile"]
Expand Down
11 changes: 6 additions & 5 deletions object_store/src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ use crate::aws::{
AmazonS3, AwsCredential, AwsCredentialProvider, Checksum, S3ConditionalPut, S3CopyIfNotExists,
STORE,
};
use crate::client::{HttpConnector, ReqwestConnector, TokenCredentialProvider};
use crate::client::{default_connector, HttpConnector, TokenCredentialProvider};
use crate::config::ConfigValue;
use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use http::{HeaderMap, HeaderValue};
use itertools::Itertools;
use md5::{Digest, Md5};
use reqwest::header::{HeaderMap, HeaderValue};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -897,9 +897,10 @@ impl AmazonS3Builder {
self.parse_url(&url)?;
}

let http = self
.http_connector
.unwrap_or_else(|| Arc::new(ReqwestConnector::default()));
let http = match self.http_connector {
Some(connector) => connector,
None => default_connector()?,
};

let bucket = self.bucket_name.ok_or(Error::MissingBucketName)?;
let region = self.region.unwrap_or_else(|| "us-east-1".to_string());
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/aws/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ struct CreateSessionOutput {
credentials: SessionCredentials,
}

#[cfg(test)]
#[cfg(all(test, feature = "reqwest"))]
mod tests {
use super::*;
use crate::client::mock_server::MockServer;
Expand Down
12 changes: 8 additions & 4 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::{Method, StatusCode};
use http::header::{IF_MATCH, IF_NONE_MATCH};
use http::{HeaderName, Method, StatusCode};
use std::{sync::Arc, time::Duration};
use url::Url;

Expand All @@ -58,12 +58,16 @@ mod client;
mod credential;
mod dynamo;
mod precondition;
mod resolve;

pub use builder::{AmazonS3Builder, AmazonS3ConfigKey};
pub use checksum::Checksum;
pub use dynamo::DynamoCommit;
pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};

#[cfg(feature = "reqwest")]
mod resolve;

#[cfg(feature = "reqwest")]
pub use resolve::resolve_bucket_region;

/// This struct is used to maintain the URI path encoding
Expand Down Expand Up @@ -115,7 +119,7 @@ impl Signer for AmazonS3 {
/// ```
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// # use object_store::{aws::AmazonS3Builder, path::Path, signer::Signer};
/// # use reqwest::Method;
/// # use http::Method;
/// # use std::time::Duration;
/// #
/// let region = "us-east-1";
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/aws/precondition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub enum S3CopyIfNotExists {
/// other than 412.
///
/// Encoded as `header-with-status:<HEADER_NAME>:<HEADER_VALUE>:<STATUS>` ignoring whitespace
HeaderWithStatus(String, String, reqwest::StatusCode),
HeaderWithStatus(String, String, http::StatusCode),
/// Native Amazon S3 supports copy if not exists through a multipart upload
/// where the upload copies an existing object and is completed only if the
/// new object does not already exist.
Expand Down
9 changes: 5 additions & 4 deletions object_store/src/azure/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::azure::credential::{
ImdsManagedIdentityProvider, WorkloadIdentityOAuthProvider,
};
use crate::azure::{AzureCredential, AzureCredentialProvider, MicrosoftAzure, STORE};
use crate::client::{HttpConnector, ReqwestConnector, TokenCredentialProvider};
use crate::client::{default_connector, HttpConnector, TokenCredentialProvider};
use crate::config::ConfigValue;
use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider};
use percent_encoding::percent_decode_str;
Expand Down Expand Up @@ -907,9 +907,10 @@ impl MicrosoftAzureBuilder {
Arc::new(StaticCredentialProvider::new(credential))
};

let http = self
.http_connector
.unwrap_or_else(|| Arc::new(ReqwestConnector::default()));
let http = match self.http_connector {
Some(connector) => connector,
None => default_connector()?,
};

let (is_emulator, storage_url, auth, account) = if self.use_emulator.get()? {
let account_name = self
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,7 @@ pub(crate) struct UserDelegationKey {
pub value: String,
}

#[cfg(test)]
#[cfg(all(test, feature = "reqwest"))]
mod tests {
use super::*;
use crate::StaticCredentialProvider;
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/azure/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ impl CredentialProvider for AzureCliCredential {
}
}

#[cfg(test)]
#[cfg(all(test, feature = "reqwest"))]
mod tests {
use futures::executor::block_on;
use http::{Response, StatusCode};
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
};
use async_trait::async_trait;
use futures::stream::{BoxStream, StreamExt, TryStreamExt};
use reqwest::Method;
use http::Method;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -170,7 +170,7 @@ impl Signer for MicrosoftAzure {
/// ```
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// # use object_store::{azure::MicrosoftAzureBuilder, path::Path, signer::Signer};
/// # use reqwest::Method;
/// # use http::Method;
/// # use std::time::Duration;
/// #
/// let azure = MicrosoftAzureBuilder::new()
Expand Down
3 changes: 2 additions & 1 deletion object_store/src/client/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use crate::{collect_bytes, PutPayload};
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use http_body::{Body, Frame, SizeHint};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use hyper::body::{Body, Frame, SizeHint};
use std::pin::Pin;
use std::task::{Context, Poll};

Expand All @@ -39,6 +39,7 @@ impl HttpRequestBody {
Self(Inner::Bytes(Bytes::new()))
}

#[cfg(feature = "reqwest")]
pub(crate) fn into_reqwest(self) -> reqwest::Body {
match self.0 {
Inner::Bytes(b) => b.into(),
Expand Down
83 changes: 53 additions & 30 deletions object_store/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

use crate::client::body::{HttpRequest, HttpResponse};
use crate::client::builder::{HttpRequestBuilder, RequestBuilderError};
use crate::client::HttpResponseBody;
use crate::ClientOptions;
use async_trait::async_trait;
use http::{Method, Uri};
use http_body_util::BodyExt;
use std::error::Error;
use std::sync::Arc;

Expand Down Expand Up @@ -83,7 +81,8 @@ impl HttpError {
}
}

pub(crate) fn reqwest(e: reqwest::Error) -> Self {
#[cfg(feature = "reqwest")]
pub(crate) fn reqwest(e: ::reqwest::Error) -> Self {
let mut kind = if e.is_timeout() {
HttpErrorKind::Timeout
} else if e.is_connect() {
Expand Down Expand Up @@ -199,39 +198,63 @@ impl HttpClient {
}
}

#[async_trait]
impl HttpService for reqwest::Client {
async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
let (parts, body) = req.into_parts();

let url = parts.uri.to_string().parse().unwrap();
let mut req = reqwest::Request::new(parts.method, url);
*req.headers_mut() = parts.headers;
*req.body_mut() = Some(body.into_reqwest());

let r = self.execute(req).await.map_err(HttpError::reqwest)?;
let res: http::Response<reqwest::Body> = r.into();
let (parts, body) = res.into_parts();

let body = HttpResponseBody::new(body.map_err(HttpError::reqwest));
Ok(HttpResponse::from_parts(parts, body))
}
}

/// A factory for [`HttpClient`]
pub trait HttpConnector: std::fmt::Debug + Send + Sync + 'static {
/// Create a new [`HttpClient`] with the provided [`ClientOptions`]
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient>;
}

/// [`HttpConnector`] using [`reqwest::Client`]
#[derive(Debug, Default)]
#[allow(missing_copy_implementations)]
pub struct ReqwestConnector {}
#[cfg(feature = "reqwest")]
pub(crate) fn default_connector() -> crate::Result<Arc<dyn HttpConnector>> {
Ok(Arc::new(ReqwestConnector::default()))
}

#[cfg(not(feature = "reqwest"))]
pub(crate) fn default_connector() -> crate::Result<Arc<dyn HttpConnector>> {
const MSG: &str = "No HTTP Client, either enable reqwest feature or override HttpConnector";

Err(crate::Error::NotSupported {
source: MSG.to_string().into(),
})
}

#[cfg(feature = "reqwest")]
mod reqwest {
use super::*;
use crate::client::HttpResponseBody;
use ::reqwest::{Client, Request};
use http_body_util::BodyExt;
#[async_trait]
impl HttpService for Client {
async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
let (parts, body) = req.into_parts();

let url = parts.uri.to_string().parse().unwrap();
let mut req = Request::new(parts.method, url);
*req.headers_mut() = parts.headers;
*req.body_mut() = Some(body.into_reqwest());

let r = self.execute(req).await.map_err(HttpError::reqwest)?;
let res: http::Response<_> = r.into();
let (parts, body) = res.into_parts();

let body = HttpResponseBody::new(body.map_err(HttpError::reqwest));
Ok(HttpResponse::from_parts(parts, body))
}
}

/// [`HttpConnector`] using [`reqwest::Client`]
#[derive(Debug, Default)]
#[allow(missing_copy_implementations)]
pub struct ReqwestConnector {}

impl HttpConnector for ReqwestConnector {
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
let client = options.client()?;
Ok(HttpClient::new(client))
impl HttpConnector for ReqwestConnector {
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
let client = options.client()?;
Ok(HttpClient::new(client))
}
}
}

#[cfg(feature = "reqwest")]
pub use reqwest::*;
5 changes: 2 additions & 3 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ use crate::{Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPay
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use http::header::{
CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_RANGE,
CONTENT_TYPE,
ToStrError, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
CONTENT_RANGE, CONTENT_TYPE,
};
use http::StatusCode;
use reqwest::header::ToStrError;

/// A client that can perform a get request
#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/client/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub(crate) enum Error {
MissingEtag,

#[error("Received header containing non-ASCII data")]
BadHeader { source: reqwest::header::ToStrError },
BadHeader { source: http::header::ToStrError },

#[error("Last-Modified Header missing from response")]
MissingLastModified,
Expand Down
Loading
Loading