Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup cache dir at mount and exit #620

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
//! reducing both the number of requests as well as the latency for the reads.
//! Ultimately, this means reduced cost in terms of S3 billing as well as compute time.

mod cache_directory;
mod disk_data_cache;
mod in_memory_data_cache;

use mountpoint_s3_client::types::ETag;
use thiserror::Error;

pub use crate::checksums::ChecksummedBytes;
pub use crate::data_cache::cache_directory::ManagedCacheDir;
pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig};
pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache;

Expand Down
153 changes: 153 additions & 0 deletions mountpoint-s3/src/data_cache/cache_directory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
//! Provides functionality related to the inner cache directory Mountpoint creates or uses.
//! Mountpoint attempts to cleanup the contents during mount and exit.
//!
//! Mountpoint uses a directory inside the user-provided cache directory
//! to mitigate any impact from the user providing a directory that already contains data.
//! Using a new sub-directory minimizes the interference with the existing directory structure,
//! and limits the risk from deleting or overwriting data to files written within this sub-directory.

use std::fs;
use std::io;
use std::path::{Path, PathBuf};

use thiserror::Error;

/// Cache directory that has been created and emptied, and will be emptied when dropped.
#[derive(Debug)]
pub struct ManagedCacheDir {
managed_path: PathBuf,
}

#[derive(Debug, Error)]
pub enum ManagedCacheDirError {
#[error("creation of cache sub-directory failed due to IO error: {0}")]
CreationFailure(#[source] io::Error),
#[error("cleanup of cache sub-directory failed due to IO error: {0}")]
CleanupFailure(#[source] io::Error),
}

impl ManagedCacheDir {
/// Create a new directory inside the provided parent path, cleaning it of any contents if it already exists.
pub fn new_from_parent<P: AsRef<Path>>(parent_path: P) -> Result<Self, ManagedCacheDirError> {
let managed_path = parent_path.as_ref().join("mountpoint-cache");

if let Err(mkdir_err) = fs::create_dir(&managed_path) {
match mkdir_err.kind() {
io::ErrorKind::AlreadyExists => (),
_kind => return Err(ManagedCacheDirError::CreationFailure(mkdir_err)),
}
}

let managed_cache_dir = Self { managed_path };
managed_cache_dir.clean()?;

Ok(managed_cache_dir)
}

/// Clear the cache sub-directory
fn clean(&self) -> Result<(), ManagedCacheDirError> {
tracing::debug!(cache_subdirectory = ?self.managed_path, "cleaning up contents of cache sub-directory");
let read_dir = fs::read_dir(self.managed_path.as_path()).map_err(ManagedCacheDirError::CleanupFailure)?;
for entry in read_dir {
let path = entry.map_err(ManagedCacheDirError::CleanupFailure)?.path();
if path.is_dir() {
fs::remove_dir_all(path).map_err(ManagedCacheDirError::CleanupFailure)?;
} else {
fs::remove_file(path).map_err(ManagedCacheDirError::CleanupFailure)?;
}
}
tracing::trace!(cache_subdirectory = ?self.managed_path, "cleanup complete");
Ok(())
}

/// Retrieve a reference to the managed path
pub fn as_path(&self) -> &Path {
self.managed_path.as_path()
}

/// Create an owned copy of the managed path
pub fn as_path_buf(&self) -> PathBuf {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unnecessary. Can't you do something like .as_path().to_owned() if required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but I found we were doing that a few times and it felt more ergonomic to just add the method here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left this in for now. We do use it at the moment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we should remove. But not a blocker.

self.managed_path.clone()
}
}

impl Drop for ManagedCacheDir {
fn drop(&mut self) {
if let Err(err) = self.clean() {
tracing::error!(managed_cache_path = ?self.managed_path, "failed to clean cache directory: {err}");
}
}
}

#[cfg(test)]
mod tests {
use super::ManagedCacheDir;

use std::fs;

#[test]
fn test_unused() {
let temp_dir = tempfile::tempdir().unwrap();
let expected_path = temp_dir.path().join("mountpoint-cache");

let managed_dir =
ManagedCacheDir::new_from_parent(temp_dir.path()).expect("creating managed dir should succeed");
assert!(expected_path.try_exists().unwrap(), "{expected_path:?} should exist");

drop(managed_dir);
let dir_entries = fs::read_dir(&expected_path)
.expect("cache dir should still exist")
.count();
assert!(dir_entries == 0, "directory should be empty");

temp_dir.close().unwrap();
}

#[test]
fn test_used() {
let temp_dir = tempfile::tempdir().unwrap();
let expected_path = temp_dir.path().join("mountpoint-cache");

let managed_dir =
ManagedCacheDir::new_from_parent(temp_dir.path()).expect("creating managed dir should succeed");
assert!(expected_path.try_exists().unwrap(), "{expected_path:?} should exist");

fs::File::create(expected_path.join("file.txt"))
.expect("should be able to create file within managed directory");
fs::create_dir(expected_path.join("dir")).expect("should be able to create dir within managed directory");
fs::File::create(expected_path.join("dir/file.txt"))
.expect("should be able to create file within subdirectory");

drop(managed_dir);
let dir_entries = fs::read_dir(&expected_path)
.expect("cache dir should still exist")
.count();
assert!(dir_entries == 0, "directory should be empty");

temp_dir.close().unwrap();
}

#[test]
fn test_already_exists() {
let temp_dir = tempfile::tempdir().unwrap();
let expected_path = temp_dir.path().join("mountpoint-cache");

fs::create_dir_all(expected_path.join("dir")).unwrap();
fs::File::create(expected_path.join("dir/file.txt")).unwrap();

let managed_dir =
ManagedCacheDir::new_from_parent(temp_dir.path()).expect("creating managed dir should succeed");
assert!(expected_path.try_exists().unwrap(), "{expected_path:?} should exist");

let dir_entries = fs::read_dir(&expected_path).unwrap().count();
assert!(dir_entries == 0, "directory should be empty");

drop(managed_dir);
let dir_entries = fs::read_dir(&expected_path)
.expect("cache dir should still exist")
.count();
assert!(dir_entries == 0, "directory should be empty");

temp_dir.close().unwrap();
}
}
21 changes: 20 additions & 1 deletion mountpoint-s3/src/fuse/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ use crate::sync::Arc;
/// this process to be interrupted.
pub struct FuseSession {
unmounter: SessionUnmounter,
/// Waits for messages from threads or signal handler.
receiver: mpsc::Receiver<Message>,
/// List of closures or functions to call when session is exiting.
on_close: Vec<OnClose>,
}

type OnClose = Box<dyn FnOnce()>;

impl FuseSession {
/// Create worker threads to dispatch requests for a FUSE session.
pub fn new<FS: Filesystem + Send + Sync + 'static>(
Expand Down Expand Up @@ -50,6 +55,8 @@ impl FuseSession {
Ok(thd_result) => {
if let Err(fuse_worker_error) = thd_result {
error!(thread_name, "worker thread failed: {fuse_worker_error:?}");
} else {
trace!(thread_name, "worker thread exited OK");
}
}
};
Expand All @@ -70,15 +77,27 @@ impl FuseSession {
Ok(Self {
unmounter,
receiver: rx,
on_close: Default::default(),
})
}

/// Add a new handler which is executed when this session is shutting down.
pub fn run_on_close(&mut self, handler: OnClose) {
self.on_close.push(handler);
}

/// Block until the file system is unmounted or this process is interrupted via SIGTERM/SIGINT.
/// When that happens, unmount the file system (if it hasn't been already unmounted).
pub fn join(mut self) -> anyhow::Result<()> {
let msg = self.receiver.recv();
trace!("received message {msg:?}, unmounting filesystem");
trace!("received message {msg:?}, closing filesystem session");

trace!("executing {} handler(s) on close", self.on_close.len());
for handler in self.on_close {
handler();
}

trace!("unmounting filesystem");
self.unmounter.unmount().context("failed to unmount FUSE session")
}
}
Expand Down
16 changes: 14 additions & 2 deletions mountpoint-s3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::time::Duration;
use anyhow::{anyhow, Context as _};
use clap::{value_parser, Parser};
use fuser::{MountOption, Session};
#[cfg(feature = "caching")]
use mountpoint_s3::data_cache::ManagedCacheDir;
use mountpoint_s3::fs::S3FilesystemConfig;
use mountpoint_s3::fuse::session::FuseSession;
use mountpoint_s3::fuse::S3FuseFilesystem;
Expand Down Expand Up @@ -586,9 +588,11 @@ fn mount(args: CliArgs) -> anyhow::Result<FuseSession> {
};

if let Some(cache_config) = cache_config {
let cache = DiskDataCache::new(path, cache_config);
let managed_cache_dir =
ManagedCacheDir::new_from_parent(path).context("failed to create cache directory")?;
let cache = DiskDataCache::new(managed_cache_dir.as_path_buf(), cache_config);
let prefetcher = caching_prefetch(cache, runtime, prefetcher_config);
return create_filesystem(
let mut fuse_session = create_filesystem(
client,
prefetcher,
&args.bucket_name,
Expand All @@ -597,6 +601,14 @@ fn mount(args: CliArgs) -> anyhow::Result<FuseSession> {
fuse_config,
&bucket_description,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could just ? here and simplify the rest


if let Ok(session) = &mut fuse_session {
session.run_on_close(Box::new(move || {
drop(managed_cache_dir);
}));
}

return fuse_session;
}
}
}
Expand Down