Skip to content

Commit

Permalink
Complete the upload on flush (#526)
Browse files Browse the repository at this point in the history
Currently, Mountpoint will complete an upload in two cases:
* on `release`, that is when the last file descriptor pointing to an open file handle is closed.
  This is transparent for the caller, but does not allow for reporting the outcome of the upload,
  nor for blocking until it is completed. This means that a read-after-close may not succeed
  because the upload is still in progress.
* on `fsync`, which is blocking and can return an error to the caller, but needs to be explicitly
  invoked before closing a file.

This change implements the `flush` operation, which is invoked when a file descriptor is closed.
On `flush`, like on `fsync`, Mountpoint will complete the upload, block, and return on success or
failure. In order to support common usage patterns where it is invoked multiple times, `flush`,
unlike `fsync`, will be a no-op when invoked before any data has been written or by a different
process than the one that originally opened the file.

Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro authored Oct 19, 2023
1 parent 7eae9e1 commit 2431807
Show file tree
Hide file tree
Showing 9 changed files with 341 additions and 56 deletions.
7 changes: 7 additions & 0 deletions doc/SEMANTICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ but with some limitations:
Synchronization operations (`fsync`, `fdatasync`) complete the upload of the object to S3 and disallow
further writes.

`close` also generally completes the upload of the object and reports an error if not successful. However,
if the file is empty, or if `close` is invoked by a different process than the one that originally opened it,
`close` returns immediately and the upload is only completed asynchronously after the last reference to the
file is closed. These exceptions allow Mountpoint to support common usage patterns seen in tools like `dd`,
`touch`, or in shell redirection, that hold multiple references to an open file and keep writing to one after
closing another.

Space allocation operations (`fallocate`, `posix_fallocate`) are not supported.

Changing last access and modification times (`utime`) is supported only on files that are being written.
Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
## Unreleased

### Breaking changes
* ...
* Mountpoint will now complete file uploads at `close` time, and `close` will return an error if the upload was not successful. Before this change, `close` did not wait for the upload to complete, which could cause confusing results for applications that try to read a file they just wrote. ([#526](https://github.com/awslabs/mountpoint-s3/pull/526))

### Other changes
* Fixed a bug that cause poor performance for sequential reads in some cases ([#488](https://github.com/awslabs/mountpoint-s3/pull/488)). A workaround we have previously shared for this issue (setting the `--max-threads` argument to `1`) is no longer necessary with this fix. ([#556](https://github.com/awslabs/mountpoint-s3/pull/556))
* Fixed a bug that caused poor performance for sequential reads in some cases ([#488](https://github.com/awslabs/mountpoint-s3/pull/488)). A workaround we have previously shared for this issue (setting the `--max-threads` argument to `1`) is no longer necessary with this fix. ([#556](https://github.com/awslabs/mountpoint-s3/pull/556))

## v1.0.2 (September 22, 2023)

Expand Down
115 changes: 97 additions & 18 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl<Client: ObjectClient, Runtime> FileHandleType<Client, Runtime> {
lookup: &LookedUp,
ino: InodeNo,
flags: i32,
pid: u32,
fs: &S3Filesystem<Client, Runtime>,
) -> Result<FileHandleType<Client, Runtime>, Error> {
// We can't support O_SYNC writes because they require the data to go to stable storage
Expand All @@ -77,7 +78,7 @@ impl<Client: ObjectClient, Runtime> FileHandleType<Client, Runtime> {
return Err(err!(libc::EINVAL, "O_SYNC and O_DSYNC are not supported"));
}

let handle = match fs.superblock.write(&fs.client, ino, lookup.inode.parent()).await {
let handle = match fs.superblock.write(&fs.client, ino, lookup.inode.parent(), pid).await {
Ok(handle) => handle,
Err(e) => {
return Err(e.into());
Expand Down Expand Up @@ -127,7 +128,12 @@ enum UploadState<Client: ObjectClient> {

impl<Client: ObjectClient> UploadState<Client> {
async fn write(&mut self, offset: i64, data: &[u8], key: &str) -> Result<u32, Error> {
let upload = self.get_upload_in_progress(key)?;
let upload = match self {
Self::InProgress { request, .. } => request,
Self::Completed => return Err(err!(libc::EIO, "upload already completed for key {:?}", key)),
Self::Failed(e) => return Err(err!(*e, "upload already aborted for key {:?}", key)),
};

match upload.write(offset, data).await {
Ok(len) => Ok(len as u32),
Err(e) => {
Expand All @@ -139,20 +145,42 @@ impl<Client: ObjectClient> UploadState<Client> {
error!(?err, "error updating the inode status");
}
}
Self::Failed(_) | Self::Completed => unreachable!("checked by get_upload_in_progress"),
Self::Failed(_) | Self::Completed => unreachable!("checked above"),
};
Err(e.into())
}
}
}

async fn complete(&mut self, key: &str) -> Result<(), Error> {
// Check that the upload is still in progress.
_ = self.get_upload_in_progress(key)?;
async fn complete(&mut self, key: &str, ignore_if_empty: bool, pid: Option<u32>) -> Result<(), Error> {
match self {
Self::InProgress { request, handle } => {
if ignore_if_empty && request.size() == 0 {
trace!(key, "not completing upload because file is empty");
return Ok(());
}
if let Some(pid) = pid {
let open_pid = handle.pid();
if !are_from_same_process(open_pid, pid) {
trace!(
key,
pid,
open_pid,
"not completing upload because current pid differs from pid at open"
);
return Ok(());
}
}
}
Self::Completed => return Ok(()),
Self::Failed(e) => return Err(err!(*e, "upload already aborted for key {:?}", key)),
};

let (upload, handle) = match std::mem::replace(self, Self::Completed) {
Self::InProgress { request, handle } => (request, handle),
Self::Failed(_) | Self::Completed => unreachable!("checked by get_upload_in_progress"),
Self::Failed(_) | Self::Completed => unreachable!("checked above"),
};

let result = Self::complete_upload(upload, key, handle).await;
if let Err(e) = &result {
*self = Self::Failed(e.to_errno());
Expand Down Expand Up @@ -182,14 +210,44 @@ impl<Client: ObjectClient> UploadState<Client> {
}
put_result
}
}

fn get_upload_in_progress(&mut self, key: &str) -> Result<&mut UploadRequest<Client>, Error> {
match self {
Self::InProgress { request, .. } => Ok(request),
Self::Completed => Err(err!(libc::EIO, "upload already completed for key {:?}", key)),
Self::Failed(e) => Err(err!(*e, "upload already aborted for key {:?}", key)),
/// Get the thread-group id (tgid) from a process id (pid).
/// Despite the names, the process id is actually the thread id
/// and the thread-group id is the parent process id.
/// Returns `None` if unable to find or parse the task status.
/// Not supported on macOS.
fn get_tgid(pid: u32) -> Option<u32> {
#[cfg(not(target_os = "macos"))]
{
use std::fs::File;
use std::io::{BufRead, BufReader};

let path = format!("/proc/{}/task/{}/status", pid, pid);
let file = File::open(path).ok()?;
for line in BufReader::new(file).lines() {
let line = line.ok()?;
if line.starts_with("Tgid:") {
return line["Tgid: ".len()..].trim().parse::<u32>().ok();
}
}
}

None
}

/// Check whether two pids correspond to the same process.
fn are_from_same_process(pid1: u32, pid2: u32) -> bool {
if pid1 == pid2 {
return true;
}
let Some(tgid1) = get_tgid(pid1) else {
return false;
};
let Some(tgid2) = get_tgid(pid2) else {
return false;
};
tgid1 == tgid2
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -470,8 +528,8 @@ where
self.superblock.forget(ino, n);
}

pub async fn open(&self, ino: InodeNo, flags: i32) -> Result<Opened, Error> {
trace!("fs:open with ino {:?} flags {:?}", ino, flags);
pub async fn open(&self, ino: InodeNo, flags: i32, pid: u32) -> Result<Opened, Error> {
trace!("fs:open with ino {:?} flags {:?} pid {:?}", ino, flags, pid);

let force_revalidate = self.config.cache_config.prefer_s3;
let lookup = self.superblock.getattr(&self.client, ino, force_revalidate).await?;
Expand All @@ -488,10 +546,10 @@ where
FileHandleType::new_read_handle(&lookup).await?
} else {
trace!("fs:open choosing write handle for O_RDWR");
FileHandleType::new_write_handle(&lookup, ino, flags, self).await?
FileHandleType::new_write_handle(&lookup, ino, flags, pid, self).await?
}
} else if flags & libc::O_WRONLY != 0 {
FileHandleType::new_write_handle(&lookup, ino, flags, self).await?
FileHandleType::new_write_handle(&lookup, ino, flags, pid, self).await?
} else {
FileHandleType::new_read_handle(&lookup).await?
};
Expand Down Expand Up @@ -765,7 +823,7 @@ where
}
}

pub async fn fsync(&self, _ino: InodeNo, fh: u64, _datasync: bool) -> Result<(), Error> {
async fn complete_upload(&self, fh: u64, ignore_if_empty: bool, pid: Option<u32>) -> Result<(), Error> {
let file_handle = {
let file_handles = self.file_handles.read().await;
match file_handles.get(&fh) {
Expand All @@ -777,14 +835,35 @@ where
FileHandleType::Write(request) => request.lock().await,
FileHandleType::Read { .. } => return Ok(()),
};
match request.complete(&file_handle.full_key).await {
match request.complete(&file_handle.full_key, ignore_if_empty, pid).await {
// According to the `fsync` man page we should return ENOSPC instead of EFBIG if it's a
// space-related failure.
Err(e) if e.to_errno() == libc::EFBIG => Err(err!(libc::ENOSPC, source:e, "object too big")),
ret => ret,
}
}

pub async fn fsync(&self, _ino: InodeNo, fh: u64, _datasync: bool) -> Result<(), Error> {
self.complete_upload(fh, false, None).await
}

pub async fn flush(&self, _ino: InodeNo, fh: u64, _lock_owner: u64, pid: u32) -> Result<(), Error> {
// We generally want to complete the upload when users close a file descriptor (and flush
// is invoked), so that we can notify them of the outcome. However, since different file
// descriptors can point to the same file handle, flush can be invoked multiple times on
// a file handle and will fail once the object has been uploaded.
// While we cannot avoid this issue in the general case, we want to support common usage
// patterns, in particular:
// * commands like `touch` and `dd` duplicate a file descriptor immediately after open,
// close (flush) the original one, and then start writing on the duplicate. We support
// these cases by only completing the upload on flush when some bytes have been written.
// * a `fork` on a process with open file descriptors will duplicate them for the child
// process. In many cases, the child will then immediately close (flush) the duplicated
// file descriptors. We will not complete the upload if we can detect that the process
// invoking flush is different from the one that originally opened the file.
self.complete_upload(fh, true, Some(pid)).await
}

pub async fn release(
&self,
ino: InodeNo,
Expand Down
21 changes: 12 additions & 9 deletions mountpoint-s3/src/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ where
block_on(self.fs.forget(ino, nlookup));
}

#[instrument(level="warn", skip_all, fields(req=_req.unique(), ino=ino))]
fn open(&self, _req: &Request<'_>, ino: InodeNo, flags: i32, reply: ReplyOpen) {
match block_on(self.fs.open(ino, flags).in_current_span()) {
#[instrument(level="warn", skip_all, fields(req=req.unique(), ino=ino, pid=req.pid()))]
fn open(&self, req: &Request<'_>, ino: InodeNo, flags: i32, reply: ReplyOpen) {
match block_on(self.fs.open(ino, flags, req.pid()).in_current_span()) {
Ok(opened) => reply.opened(opened.fh, opened.flags),
Err(e) => fuse_error!("open", reply, e),
}
Expand Down Expand Up @@ -255,6 +255,14 @@ where
}
}

#[instrument(level="warn", skip_all, fields(req=req.unique(), ino=ino, fh=fh, pid=req.pid()))]
fn flush(&self, req: &Request<'_>, ino: u64, fh: u64, lock_owner: u64, reply: ReplyEmpty) {
match block_on(self.fs.flush(ino, fh, lock_owner, req.pid()).in_current_span()) {
Ok(()) => reply.ok(),
Err(e) => fuse_error!("flush", reply, e),
}
}

#[instrument(level="warn", skip_all, fields(req=_req.unique(), ino=ino, fh=fh))]
fn release(
&self,
Expand Down Expand Up @@ -311,7 +319,7 @@ where
}
}

#[instrument(level="warn", skip_all, fields(req=_req.unique(), ino=ino, fh=fh, offset=offset, length=data.len()))]
#[instrument(level="warn", skip_all, fields(req=_req.unique(), ino=ino, fh=fh, offset=offset, length=data.len(), pid=_req.pid()))]
fn write(
&self,
_req: &Request<'_>,
Expand Down Expand Up @@ -420,11 +428,6 @@ where
fuse_unsupported!("link", reply, libc::EPERM);
}

#[instrument(level="warn", skip_all, fields(req=_req.unique(), ino=ino, fh=fh))]
fn flush(&self, _req: &Request<'_>, ino: u64, fh: u64, _lock_owner: u64, reply: ReplyEmpty) {
fuse_unsupported!("flush", reply);
}

#[instrument(level="warn", skip_all, fields(req=_req.unique(), ino=ino, fh=fh, datasync=datasync))]
fn fsyncdir(&self, _req: &Request<'_>, ino: u64, fh: u64, datasync: bool, reply: ReplyEmpty) {
fuse_unsupported!("fsyncdir", reply);
Expand Down
26 changes: 19 additions & 7 deletions mountpoint-s3/src/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,11 @@ impl Superblock {
_client: &OC,
ino: InodeNo,
parent_ino: InodeNo,
pid: u32,
) -> Result<WriteHandle, InodeError> {
trace!(?ino, parent=?parent_ino, "write");

WriteHandle::new(self.inner.clone(), ino, parent_ino)
WriteHandle::new(self.inner.clone(), ino, parent_ino, pid)
}

/// Start a readdir stream for the given directory inode
Expand Down Expand Up @@ -991,23 +992,34 @@ pub struct WriteHandle {
inner: Arc<SuperblockInner>,
ino: InodeNo,
parent_ino: InodeNo,
pid: u32,
}

impl WriteHandle {
/// Check the status on the inode and set it to writing state if it's writable
fn new(inner: Arc<SuperblockInner>, ino: InodeNo, parent_ino: InodeNo) -> Result<Self, InodeError> {
fn new(inner: Arc<SuperblockInner>, ino: InodeNo, parent_ino: InodeNo, pid: u32) -> Result<Self, InodeError> {
let inode = inner.get(ino)?;
let mut state = inode.get_mut_inode_state()?;
match state.write_status {
WriteStatus::LocalUnopened => {
state.write_status = WriteStatus::LocalOpen;
Ok(Self { inner, ino, parent_ino })
Ok(Self {
inner,
ino,
parent_ino,
pid,
})
}
WriteStatus::LocalOpen => Err(InodeError::InodeAlreadyWriting(inode.err())),
WriteStatus::Remote => Err(InodeError::InodeNotWritable(inode.err())),
}
}

/// The pid of the process which opened this handle.
pub fn pid(&self) -> u32 {
self.pid
}

/// Update status of the inode and of containing "local" directories.
pub fn finish_writing(self) -> Result<(), InodeError> {
let inode = self.inner.get(self.ino)?;
Expand Down Expand Up @@ -1867,7 +1879,7 @@ mod tests {
.await
.unwrap();
superblock
.write(&client, new_inode.inode.ino(), FUSE_ROOT_INODE)
.write(&client, new_inode.inode.ino(), FUSE_ROOT_INODE, 0)
.await
.unwrap();
expected_list.push(filename);
Expand Down Expand Up @@ -1923,7 +1935,7 @@ mod tests {
.await
.unwrap();
superblock
.write(&client, new_inode.inode.ino(), FUSE_ROOT_INODE)
.write(&client, new_inode.inode.ino(), FUSE_ROOT_INODE, 0)
.await
.unwrap();
expected_list.push(filename);
Expand Down Expand Up @@ -2247,7 +2259,7 @@ mod tests {
.unwrap();

let writehandle = superblock
.write(&client, new_inode.inode.ino(), leaf_dir_ino)
.write(&client, new_inode.inode.ino(), leaf_dir_ino, 0)
.await
.unwrap();

Expand Down Expand Up @@ -2416,7 +2428,7 @@ mod tests {
.unwrap();

let writehandle = superblock
.write(&client, new_inode.inode.ino(), FUSE_ROOT_INODE)
.write(&client, new_inode.inode.ino(), FUSE_ROOT_INODE, 0)
.await
.unwrap();

Expand Down
Loading

0 comments on commit 2431807

Please sign in to comment.