Skip to content

Commit

Permalink
Introduce ObjectStore trait to replace ObjectClient in mountpoint-s3
Browse files Browse the repository at this point in the history
Introduce a new `ObjectStore` trait that replaces `ObjectClient` in the mountpoint-s3 crate. In addition to most of `ObjectClient` methods, `ObjectStore` also declares a new `prefetch` method returning a `PrefetchGetObject` which allows callers to read the object content. `PrefetchGetObject` is where `ObjectStore` implementations can add object data caching.

This change also reworks the `Prefetcher` so that `ObjectStore` implementations can delegate `prefetch` to it. The main changes to `Prefetcher` are:
* it is now generic on the `ObjectPartStream` (previously `ObjectPartFeed`), rather than using dynamic dispatch.
* the logic to spawn a new task for each `GetObject` request and handle the object body parts returned was moved into `ObjectPartStream`.

Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro committed Nov 1, 2023
1 parent 404ba9c commit 9877b6c
Show file tree
Hide file tree
Showing 20 changed files with 1,140 additions and 715 deletions.
8 changes: 5 additions & 3 deletions mountpoint-s3/examples/fs_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use clap::{Arg, ArgAction, Command};
use fuser::{BackgroundSession, MountOption, Session};
use mountpoint_s3::fuse::S3FuseFilesystem;
use mountpoint_s3::store::default_store;
use mountpoint_s3::S3FilesystemConfig;
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::S3CrtClient;
use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter;
use std::{
fs::File,
fs::OpenOptions,
fs::{File, OpenOptions},
io::{self, BufRead, BufReader},
sync::Arc,
time::Instant,
};
use tempfile::tempdir;
Expand Down Expand Up @@ -164,8 +165,9 @@ fn mount_file_system(bucket_name: &str, region: &str, throughput_target_gbps: Op
bucket_name,
mountpoint.to_str().unwrap()
);
let store = default_store(Arc::new(client), runtime, Default::default());
let session = Session::new(
S3FuseFilesystem::new(client, runtime, bucket_name, &Default::default(), filesystem_config),
S3FuseFilesystem::new(store, bucket_name, &Default::default(), filesystem_config),
mountpoint,
&options,
)
Expand Down
6 changes: 4 additions & 2 deletions mountpoint-s3/examples/prefetch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::time::Instant;

use clap::{Arg, Command};
use futures::executor::{block_on, ThreadPool};
use mountpoint_s3::prefetch::Prefetcher;
use mountpoint_s3::prefetch::{ClientPartStream, Prefetcher};
use mountpoint_s3::store::PrefetchGetObject;
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::types::ETag;
use mountpoint_s3_client::S3CrtClient;
Expand Down Expand Up @@ -80,7 +81,8 @@ fn main() {

for i in 0..iterations.unwrap_or(1) {
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
let manager = Prefetcher::new(client.clone(), runtime, Default::default());
let part_stream = ClientPartStream::new(client.clone(), runtime);
let manager = Prefetcher::new(part_stream, Default::default());
let received_size = Arc::new(AtomicU64::new(0));

let start = Instant::now();
Expand Down
112 changes: 53 additions & 59 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! FUSE file system types and operations, not tied to the _fuser_ library bindings.
use futures::task::Spawn;
use nix::unistd::{getgid, getuid};
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
Expand All @@ -12,11 +11,10 @@ use tracing::{debug, error, trace};
use fuser::{FileAttr, KernelConfig};
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
use mountpoint_s3_client::types::ETag;
use mountpoint_s3_client::ObjectClient;

use crate::inode::{Inode, InodeError, InodeKind, LookedUp, ReaddirHandle, Superblock, WriteHandle};
use crate::prefetch::{PrefetchGetObject, PrefetchReadError, Prefetcher, PrefetcherConfig};
use crate::prefix::Prefix;
use crate::store::{ObjectStore, PrefetchGetObject, PrefetchReadError};
use crate::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use crate::sync::{Arc, AsyncMutex, AsyncRwLock};
use crate::upload::{UploadRequest, Uploader};
Expand Down Expand Up @@ -49,37 +47,45 @@ impl DirHandle {
}

#[derive(Debug)]
struct FileHandle<Client: ObjectClient, Runtime> {
struct FileHandle<Store: ObjectStore> {
inode: Inode,
full_key: String,
object_size: u64,
typ: FileHandleType<Client, Runtime>,
typ: FileHandleType<Store>,
}

#[derive(Debug)]
enum FileHandleType<Client: ObjectClient, Runtime> {
enum FileHandleType<Store: ObjectStore> {
Read {
request: AsyncMutex<Option<PrefetchGetObject<Client, Runtime>>>,
request: AsyncMutex<Option<Store::PrefetchGetObject>>,
etag: ETag,
},
Write(AsyncMutex<UploadState<Client>>),
Write(AsyncMutex<UploadState<Store>>),
}

impl<Store: ObjectStore + std::fmt::Debug> std::fmt::Debug for FileHandleType<Store> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Read { request: _, etag } => f.debug_struct("Read").field("etag", etag).finish(),
Self::Write(state) => f.debug_tuple("Write").field(state).finish(),
}
}
}

impl<Client: ObjectClient, Runtime> FileHandleType<Client, Runtime> {
impl<Store: ObjectStore> FileHandleType<Store> {
async fn new_write_handle(
lookup: &LookedUp,
ino: InodeNo,
flags: i32,
pid: u32,
fs: &S3Filesystem<Client, Runtime>,
) -> Result<FileHandleType<Client, Runtime>, Error> {
fs: &S3Filesystem<Store>,
) -> Result<FileHandleType<Store>, Error> {
// We can't support O_SYNC writes because they require the data to go to stable storage
// at `write` time, but we only commit a PUT at `close` time.
if flags & (libc::O_SYNC | libc::O_DSYNC) != 0 {
return Err(err!(libc::EINVAL, "O_SYNC and O_DSYNC are not supported"));
}

let handle = match fs.superblock.write(&fs.client, ino, lookup.inode.parent(), pid).await {
let handle = match fs.superblock.write(&fs.store, ino, lookup.inode.parent(), pid).await {
Ok(handle) => handle,
Err(e) => {
return Err(e.into());
Expand All @@ -96,7 +102,7 @@ impl<Client: ObjectClient, Runtime> FileHandleType<Client, Runtime> {
Ok(handle)
}

async fn new_read_handle(lookup: &LookedUp) -> Result<FileHandleType<Client, Runtime>, Error> {
async fn new_read_handle(lookup: &LookedUp) -> Result<FileHandleType<Store>, Error> {
if !lookup.stat.is_readable {
return Err(err!(
libc::EACCES,
Expand All @@ -117,17 +123,17 @@ impl<Client: ObjectClient, Runtime> FileHandleType<Client, Runtime> {
}

#[derive(Debug)]
enum UploadState<Client: ObjectClient> {
enum UploadState<Store: ObjectStore> {
InProgress {
request: UploadRequest<Client>,
request: UploadRequest<Store>,
handle: WriteHandle,
},
Completed,
// Remember the failure reason to respond to retries
Failed(libc::c_int),
}

impl<Client: ObjectClient> UploadState<Client> {
impl<Store: ObjectStore> UploadState<Store> {
async fn write(&mut self, offset: i64, data: &[u8], key: &str) -> Result<u32, Error> {
let upload = match self {
Self::InProgress { request, .. } => request,
Expand Down Expand Up @@ -196,7 +202,7 @@ impl<Client: ObjectClient> UploadState<Client> {
}
}

async fn complete_upload(upload: UploadRequest<Client>, key: &str, handle: WriteHandle) -> Result<(), Error> {
async fn complete_upload(upload: UploadRequest<Store>, key: &str, handle: WriteHandle) -> Result<(), Error> {
let size = upload.size();
let put_result = match upload.complete().await {
Ok(_) => {
Expand Down Expand Up @@ -303,8 +309,6 @@ pub struct S3FilesystemConfig {
pub dir_mode: u16,
/// File permissions
pub file_mode: u16,
/// Prefetcher configuration
pub prefetcher_config: PrefetcherConfig,
/// Allow delete
pub allow_delete: bool,
/// Storage class to be used for new object uploads
Expand All @@ -323,46 +327,38 @@ impl Default for S3FilesystemConfig {
gid,
dir_mode: 0o755,
file_mode: 0o644,
prefetcher_config: PrefetcherConfig::default(),
allow_delete: false,
storage_class: None,
}
}
}

#[derive(Debug)]
pub struct S3Filesystem<Client: ObjectClient, Runtime> {
pub struct S3Filesystem<Store: ObjectStore> {
config: S3FilesystemConfig,
client: Arc<Client>,
store: Store,
superblock: Superblock,
prefetcher: Prefetcher<Client, Runtime>,
uploader: Uploader<Client>,
uploader: Uploader<Store>,
bucket: String,
#[allow(unused)]
prefix: Prefix,
next_handle: AtomicU64,
dir_handles: AsyncRwLock<HashMap<u64, Arc<DirHandle>>>,
file_handles: AsyncRwLock<HashMap<u64, Arc<FileHandle<Client, Runtime>>>>,
file_handles: AsyncRwLock<HashMap<u64, Arc<FileHandle<Store>>>>,
}

impl<Client, Runtime> S3Filesystem<Client, Runtime>
impl<Store> S3Filesystem<Store>
where
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync,
Store: ObjectStore + Send + Sync + 'static,
{
pub fn new(client: Client, runtime: Runtime, bucket: &str, prefix: &Prefix, config: S3FilesystemConfig) -> Self {
pub fn new(store: Store, bucket: &str, prefix: &Prefix, config: S3FilesystemConfig) -> Self {
let superblock = Superblock::new(bucket, prefix, config.cache_config.clone());

let client = Arc::new(client);

let prefetcher = Prefetcher::new(client.clone(), runtime, config.prefetcher_config);
let uploader = Uploader::new(client.clone(), config.storage_class.to_owned());
let uploader = Uploader::new(store.clone(), config.storage_class.to_owned());

Self {
config,
client,
store,
superblock,
prefetcher,
uploader,
bucket: bucket.to_string(),
prefix: prefix.clone(),
Expand Down Expand Up @@ -430,10 +426,9 @@ pub trait ReadReplier {
fn error(self, error: Error) -> Self::Replied;
}

impl<Client, Runtime> S3Filesystem<Client, Runtime>
impl<Store> S3Filesystem<Store>
where
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync,
Store: ObjectStore + Send + Sync + 'static,
{
pub async fn init(&self, config: &mut KernelConfig) -> Result<(), libc::c_int> {
let _ = config.add_capabilities(fuser::consts::FUSE_DO_READDIRPLUS);
Expand Down Expand Up @@ -484,7 +479,7 @@ where
pub async fn lookup(&self, parent: InodeNo, name: &OsStr) -> Result<Entry, Error> {
trace!("fs:lookup with parent {:?} name {:?}", parent, name);

let lookup = self.superblock.lookup(&self.client, parent, name).await?;
let lookup = self.superblock.lookup(&self.store, parent, name).await?;
let attr = self.make_attr(&lookup);
Ok(Entry {
ttl: lookup.validity(),
Expand All @@ -496,7 +491,7 @@ where
pub async fn getattr(&self, ino: InodeNo) -> Result<Attr, Error> {
trace!("fs:getattr with ino {:?}", ino);

let lookup = self.superblock.getattr(&self.client, ino, false).await?;
let lookup = self.superblock.getattr(&self.store, ino, false).await?;
let attr = self.make_attr(&lookup);

Ok(Attr {
Expand All @@ -519,7 +514,7 @@ where
atime,
mtime
);
let lookup = self.superblock.setattr(&self.client, ino, atime, mtime).await?;
let lookup = self.superblock.setattr(&self.store, ino, atime, mtime).await?;
let attr = self.make_attr(&lookup);

Ok(Attr {
Expand All @@ -537,7 +532,7 @@ where
trace!("fs:open with ino {:?} flags {:?} pid {:?}", ino, flags, pid);

let force_revalidate = !self.config.cache_config.serve_lookup_from_cache;
let lookup = self.superblock.getattr(&self.client, ino, force_revalidate).await?;
let lookup = self.superblock.getattr(&self.store, ino, force_revalidate).await?;

match lookup.inode.kind() {
InodeKind::Directory => return Err(InodeError::IsDirectory(lookup.inode.err()).into()),
Expand Down Expand Up @@ -610,8 +605,8 @@ where

if request.is_none() {
*request = Some(
self.prefetcher
.get(&self.bucket, &handle.full_key, handle.object_size, file_etag),
self.store
.prefetch(&self.bucket, &handle.full_key, handle.object_size, file_etag),
);
}

Expand All @@ -620,14 +615,13 @@ where
Ok(bytes) => reply.data(&bytes),
Err(e) => reply.error(err!(libc::EIO, source:e, "integrity error")),
},
Err(PrefetchReadError::GetRequestFailed(ObjectClientError::ServiceError(
Err(ObjectClientError::ServiceError(PrefetchReadError::GetRequestFailed(
GetObjectError::PreconditionFailed,
))) => reply.error(err!(libc::ESTALE, "object was mutated remotely")),
Err(PrefetchReadError::Integrity(e)) => reply.error(err!(libc::EIO, source:e, "integrity error")),
Err(e @ PrefetchReadError::GetRequestFailed(_))
| Err(e @ PrefetchReadError::GetRequestTerminatedUnexpectedly) => {
reply.error(err!(libc::EIO, source:e, "get request failed"))
Err(ObjectClientError::ServiceError(PrefetchReadError::Integrity(e))) => {
reply.error(err!(libc::EIO, source:e, "integrity error"))
}
Err(e) => reply.error(err!(libc::EIO, source:e, "get request failed")),
}
}

Expand All @@ -649,7 +643,7 @@ where

let lookup = self
.superblock
.create(&self.client, parent, name, InodeKind::File)
.create(&self.store, parent, name, InodeKind::File)
.await?;
let attr = self.make_attr(&lookup);
Ok(Entry {
Expand All @@ -662,7 +656,7 @@ where
pub async fn mkdir(&self, parent: InodeNo, name: &OsStr, _mode: libc::mode_t, _umask: u32) -> Result<Entry, Error> {
let lookup = self
.superblock
.create(&self.client, parent, name, InodeKind::Directory)
.create(&self.store, parent, name, InodeKind::Directory)
.await?;
let attr = self.make_attr(&lookup);
Ok(Entry {
Expand Down Expand Up @@ -714,7 +708,7 @@ where
pub async fn opendir(&self, parent: InodeNo, _flags: i32) -> Result<Opened, Error> {
trace!("fs:opendir with parent {:?} flags {:?}", parent, _flags);

let inode_handle = self.superblock.readdir(&self.client, parent, 1000).await?;
let inode_handle = self.superblock.readdir(&self.store, parent, 1000).await?;

let fh = self.next_handle();
let handle = DirHandle {
Expand Down Expand Up @@ -827,7 +821,7 @@ where
let mut reply = Reply { reply, entries: vec![] };

if dir_handle.offset() < 1 {
let lookup = self.superblock.getattr(&self.client, parent, false).await?;
let lookup = self.superblock.getattr(&self.store, parent, false).await?;
let attr = self.make_attr(&lookup);
let entry = DirectoryEntry {
ino: parent,
Expand All @@ -846,7 +840,7 @@ where
if dir_handle.offset() < 2 {
let lookup = self
.superblock
.getattr(&self.client, dir_handle.handle.parent(), false)
.getattr(&self.store, dir_handle.handle.parent(), false)
.await?;
let attr = self.make_attr(&lookup);
let entry = DirectoryEntry {
Expand All @@ -865,7 +859,7 @@ where
}

loop {
let next = match dir_handle.handle.next(&self.client).await? {
let next = match dir_handle.handle.next(&self.store).await? {
None => return Ok(reply.finish(offset, &dir_handle).await),
Some(next) => next,
};
Expand Down Expand Up @@ -981,7 +975,7 @@ where
}

pub async fn rmdir(&self, parent_ino: InodeNo, name: &OsStr) -> Result<(), Error> {
self.superblock.rmdir(&self.client, parent_ino, name).await?;
self.superblock.rmdir(&self.store, parent_ino, name).await?;
Ok(())
}

Expand All @@ -997,6 +991,6 @@ where
if !self.config.allow_delete {
return Err(err!(libc::EPERM, "deletes are disabled"));
}
Ok(self.superblock.unlink(&self.client, parent_ino, name).await?)
Ok(self.superblock.unlink(&self.store, parent_ino, name).await?)
}
}
Loading

0 comments on commit 9877b6c

Please sign in to comment.