Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object_store: Add enabled-by-default "fs" feature #6636

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ jobs:
# targets.
- name: Run clippy with default features
run: cargo clippy -- -D warnings
- name: Run clippy without default features
run: cargo clippy --no-default-features -- -D warnings
- name: Run clippy with fs features
run: cargo clippy --no-default-features --features fs -- -D warnings
- name: Run clippy with aws feature
run: cargo clippy --features aws -- -D warnings
- name: Run clippy with gcp feature
Expand Down
4 changes: 3 additions & 1 deletion object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ percent-encoding = "2.1"
snafu = { version = "0.8", default-features = false, features = ["std", "rust_1_61"] }
tracing = { version = "0.1" }
url = "2.2"
walkdir = "2"
walkdir = { version = "2", optional = true }

# Cloud storage support
base64 = { version = "0.22", default-features = false, features = ["std"], optional = true }
Expand All @@ -60,8 +60,10 @@ md-5 = { version = "0.10.6", default-features = false, optional = true }
nix = { version = "0.29.0", features = ["fs"] }

[features]
default = ["fs"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
azure = ["cloud"]
fs = ["walkdir"]
gcp = ["cloud", "rustls-pemfile"]
aws = ["cloud", "md-5"]
http = ["cloud"]
Expand Down
4 changes: 4 additions & 0 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl ObjectStore for ChunkedStore {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let r = self.inner.get_opts(location, options).await?;
let stream = match r.payload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(file, path) => {
crate::local::chunked_stream(file, path, r.range.clone(), self.chunk_size)
}
Expand Down Expand Up @@ -178,7 +179,9 @@ impl ObjectStore for ChunkedStore {
mod tests {
use futures::StreamExt;

#[cfg(feature = "fs")]
use crate::integration::*;
#[cfg(feature = "fs")]
use crate::local::LocalFileSystem;
use crate::memory::InMemory;
use crate::path::Path;
Expand Down Expand Up @@ -209,6 +212,7 @@ mod tests {
}
}

#[cfg(feature = "fs")]
#[tokio::test]
async fn test_chunked() {
let temporary = tempfile::tempdir().unwrap();
Expand Down
16 changes: 11 additions & 5 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@
//!
//! Feature flags are used to enable support for other implementations:
//!
#![cfg_attr(
feature = "fs",
doc = "* Local filesystem: [`LocalFileSystem`](local::LocalFileSystem)"
)]
#![cfg_attr(
feature = "gcp",
doc = "* [`gcp`]: [Google Cloud Storage](https://cloud.google.com/storage/) support. See [`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder)"
Expand Down Expand Up @@ -513,7 +517,7 @@ pub mod gcp;
#[cfg(feature = "http")]
pub mod http;
pub mod limit;
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
pub mod local;
pub mod memory;
pub mod path;
Expand Down Expand Up @@ -557,15 +561,15 @@ pub use upload::*;
pub use util::{coalesce_ranges, collect_bytes, GetRange, OBJECT_STORE_COALESCE_DEFAULT};

use crate::path::Path;
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
use crate::util::maybe_spawn_blocking;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use snafu::Snafu;
use std::fmt::{Debug, Formatter};
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -1028,6 +1032,7 @@ pub struct GetResult {
/// be able to optimise the case of a file already present on local disk
pub enum GetResultPayload {
/// The file, path
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
File(std::fs::File, std::path::PathBuf),
/// An opaque stream of bytes
Stream(BoxStream<'static, Result<Bytes>>),
Expand All @@ -1036,6 +1041,7 @@ pub enum GetResultPayload {
impl Debug for GetResultPayload {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
Self::File(_, _) => write!(f, "GetResultPayload(File)"),
Self::Stream(_) => write!(f, "GetResultPayload(Stream)"),
}
Expand All @@ -1047,7 +1053,7 @@ impl GetResult {
pub async fn bytes(self) -> Result<Bytes> {
let len = self.range.end - self.range.start;
match self.payload {
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(mut file, path) => {
maybe_spawn_blocking(move || {
file.seek(SeekFrom::Start(self.range.start as _))
Expand Down Expand Up @@ -1087,7 +1093,7 @@ impl GetResult {
/// no additional complexity or overheads
pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
match self.payload {
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(file, path) => {
const CHUNK_SIZE: usize = 8 * 1024;
local::chunked_stream(file, path, self.range, CHUNK_SIZE)
Expand Down
1 change: 1 addition & 0 deletions object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {

fn permit_get_result(r: GetResult, permit: OwnedSemaphorePermit) -> GetResult {
let payload = match r.payload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
v @ GetResultPayload::File(_, _) => v,
GetResultPayload::Stream(s) => {
GetResultPayload::Stream(PermitWrapper::new(s, permit).boxed())
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
use crate::local::LocalFileSystem;
use crate::memory::InMemory;
use crate::path::Path;
Expand Down Expand Up @@ -179,7 +179,7 @@ where
let path = Path::parse(path)?;

let store = match scheme {
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
ObjectStoreScheme::Local => Box::new(LocalFileSystem::new()) as _,
ObjectStoreScheme::Memory => Box::new(InMemory::new()) as _,
#[cfg(feature = "aws")]
Expand Down
2 changes: 2 additions & 0 deletions object_store/src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,10 @@ fn usize_to_u32_saturate(x: usize) -> u32 {
}

fn throttle_get(result: GetResult, wait_get_per_byte: Duration) -> GetResult {
#[allow(clippy::infallible_destructuring_match)]
let s = match result.payload {
GetResultPayload::Stream(s) => s,
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(_, _) => unimplemented!(),
};

Expand Down
2 changes: 1 addition & 1 deletion object_store/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where
}
}

#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
/// Takes a function and spawns it to a tokio blocking pool if available
pub(crate) async fn maybe_spawn_blocking<F, T>(f: F) -> Result<T>
where
Expand Down
Loading