diff --git a/mountpoint-s3/src/data_cache.rs b/mountpoint-s3/src/data_cache.rs index b4c574deb..53cb35a20 100644 --- a/mountpoint-s3/src/data_cache.rs +++ b/mountpoint-s3/src/data_cache.rs @@ -4,6 +4,7 @@ //! reducing both the number of requests as well as the latency for the reads. //! Ultimately, this means reduced cost in terms of S3 billing as well as compute time. +mod cache_directory; mod disk_data_cache; mod in_memory_data_cache; @@ -11,6 +12,7 @@ use mountpoint_s3_client::types::ETag; use thiserror::Error; pub use crate::checksums::ChecksummedBytes; +pub use crate::data_cache::cache_directory::ManagedCacheDir; pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig}; pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache; diff --git a/mountpoint-s3/src/data_cache/cache_directory.rs b/mountpoint-s3/src/data_cache/cache_directory.rs new file mode 100644 index 000000000..1c589ce4c --- /dev/null +++ b/mountpoint-s3/src/data_cache/cache_directory.rs @@ -0,0 +1,153 @@ +//! Provides functionality related to the inner cache directory Mountpoint creates or uses. +//! Mountpoint attempts to cleanup the contents during mount and exit. +//! +//! Mountpoint uses a directory inside the user-provided cache directory +//! to mitigate any impact from the user providing a directory that already contains data. +//! Using a new sub-directory minimizes the interference with the existing directory structure, +//! and limits the risk from deleting or overwriting data to files written within this sub-directory. + +use std::fs; +use std::io; +use std::path::{Path, PathBuf}; + +use thiserror::Error; + +/// Cache directory that has been created and emptied, and will be emptied when dropped. +#[derive(Debug)] +pub struct ManagedCacheDir { + managed_path: PathBuf, +} + +#[derive(Debug, Error)] +pub enum ManagedCacheDirError { + #[error("creation of cache sub-directory failed due to IO error: {0}")] + CreationFailure(#[source] io::Error), + #[error("cleanup of cache sub-directory failed due to IO error: {0}")] + CleanupFailure(#[source] io::Error), +} + +impl ManagedCacheDir { + /// Create a new directory inside the provided parent path, cleaning it of any contents if it already exists. + pub fn new_from_parent>(parent_path: P) -> Result { + let managed_path = parent_path.as_ref().join("mountpoint-cache"); + + if let Err(mkdir_err) = fs::create_dir(&managed_path) { + match mkdir_err.kind() { + io::ErrorKind::AlreadyExists => (), + _kind => return Err(ManagedCacheDirError::CreationFailure(mkdir_err)), + } + } + + let managed_cache_dir = Self { managed_path }; + managed_cache_dir.clean()?; + + Ok(managed_cache_dir) + } + + /// Clear the cache sub-directory + fn clean(&self) -> Result<(), ManagedCacheDirError> { + tracing::debug!(cache_subdirectory = ?self.managed_path, "cleaning up contents of cache sub-directory"); + let read_dir = fs::read_dir(self.managed_path.as_path()).map_err(ManagedCacheDirError::CleanupFailure)?; + for entry in read_dir { + let path = entry.map_err(ManagedCacheDirError::CleanupFailure)?.path(); + if path.is_dir() { + fs::remove_dir_all(path).map_err(ManagedCacheDirError::CleanupFailure)?; + } else { + fs::remove_file(path).map_err(ManagedCacheDirError::CleanupFailure)?; + } + } + tracing::trace!(cache_subdirectory = ?self.managed_path, "cleanup complete"); + Ok(()) + } + + /// Retrieve a reference to the managed path + pub fn as_path(&self) -> &Path { + self.managed_path.as_path() + } + + /// Create an owned copy of the managed path + pub fn as_path_buf(&self) -> PathBuf { + self.managed_path.clone() + } +} + +impl Drop for ManagedCacheDir { + fn drop(&mut self) { + if let Err(err) = self.clean() { + tracing::error!(managed_cache_path = ?self.managed_path, "failed to clean cache directory: {err}"); + } + } +} + +#[cfg(test)] +mod tests { + use super::ManagedCacheDir; + + use std::fs; + + #[test] + fn test_unused() { + let temp_dir = tempfile::tempdir().unwrap(); + let expected_path = temp_dir.path().join("mountpoint-cache"); + + let managed_dir = + ManagedCacheDir::new_from_parent(temp_dir.path()).expect("creating managed dir should succeed"); + assert!(expected_path.try_exists().unwrap(), "{expected_path:?} should exist"); + + drop(managed_dir); + let dir_entries = fs::read_dir(&expected_path) + .expect("cache dir should still exist") + .count(); + assert!(dir_entries == 0, "directory should be empty"); + + temp_dir.close().unwrap(); + } + + #[test] + fn test_used() { + let temp_dir = tempfile::tempdir().unwrap(); + let expected_path = temp_dir.path().join("mountpoint-cache"); + + let managed_dir = + ManagedCacheDir::new_from_parent(temp_dir.path()).expect("creating managed dir should succeed"); + assert!(expected_path.try_exists().unwrap(), "{expected_path:?} should exist"); + + fs::File::create(expected_path.join("file.txt")) + .expect("should be able to create file within managed directory"); + fs::create_dir(expected_path.join("dir")).expect("should be able to create dir within managed directory"); + fs::File::create(expected_path.join("dir/file.txt")) + .expect("should be able to create file within subdirectory"); + + drop(managed_dir); + let dir_entries = fs::read_dir(&expected_path) + .expect("cache dir should still exist") + .count(); + assert!(dir_entries == 0, "directory should be empty"); + + temp_dir.close().unwrap(); + } + + #[test] + fn test_already_exists() { + let temp_dir = tempfile::tempdir().unwrap(); + let expected_path = temp_dir.path().join("mountpoint-cache"); + + fs::create_dir_all(expected_path.join("dir")).unwrap(); + fs::File::create(expected_path.join("dir/file.txt")).unwrap(); + + let managed_dir = + ManagedCacheDir::new_from_parent(temp_dir.path()).expect("creating managed dir should succeed"); + assert!(expected_path.try_exists().unwrap(), "{expected_path:?} should exist"); + + let dir_entries = fs::read_dir(&expected_path).unwrap().count(); + assert!(dir_entries == 0, "directory should be empty"); + + drop(managed_dir); + let dir_entries = fs::read_dir(&expected_path) + .expect("cache dir should still exist") + .count(); + assert!(dir_entries == 0, "directory should be empty"); + + temp_dir.close().unwrap(); + } +} diff --git a/mountpoint-s3/src/fuse/session.rs b/mountpoint-s3/src/fuse/session.rs index 11edacbcd..06148f9b3 100644 --- a/mountpoint-s3/src/fuse/session.rs +++ b/mountpoint-s3/src/fuse/session.rs @@ -13,9 +13,14 @@ use crate::sync::Arc; /// this process to be interrupted. pub struct FuseSession { unmounter: SessionUnmounter, + /// Waits for messages from threads or signal handler. receiver: mpsc::Receiver, + /// List of closures or functions to call when session is exiting. + on_close: Vec, } +type OnClose = Box; + impl FuseSession { /// Create worker threads to dispatch requests for a FUSE session. pub fn new( @@ -50,6 +55,8 @@ impl FuseSession { Ok(thd_result) => { if let Err(fuse_worker_error) = thd_result { error!(thread_name, "worker thread failed: {fuse_worker_error:?}"); + } else { + trace!(thread_name, "worker thread exited OK"); } } }; @@ -70,15 +77,27 @@ impl FuseSession { Ok(Self { unmounter, receiver: rx, + on_close: Default::default(), }) } + /// Add a new handler which is executed when this session is shutting down. + pub fn run_on_close(&mut self, handler: OnClose) { + self.on_close.push(handler); + } + /// Block until the file system is unmounted or this process is interrupted via SIGTERM/SIGINT. /// When that happens, unmount the file system (if it hasn't been already unmounted). pub fn join(mut self) -> anyhow::Result<()> { let msg = self.receiver.recv(); - trace!("received message {msg:?}, unmounting filesystem"); + trace!("received message {msg:?}, closing filesystem session"); + + trace!("executing {} handler(s) on close", self.on_close.len()); + for handler in self.on_close { + handler(); + } + trace!("unmounting filesystem"); self.unmounter.unmount().context("failed to unmount FUSE session") } } diff --git a/mountpoint-s3/src/main.rs b/mountpoint-s3/src/main.rs index a17dc05f1..40133b073 100644 --- a/mountpoint-s3/src/main.rs +++ b/mountpoint-s3/src/main.rs @@ -9,6 +9,8 @@ use std::time::Duration; use anyhow::{anyhow, Context as _}; use clap::{value_parser, Parser}; use fuser::{MountOption, Session}; +#[cfg(feature = "caching")] +use mountpoint_s3::data_cache::ManagedCacheDir; use mountpoint_s3::fs::S3FilesystemConfig; use mountpoint_s3::fuse::session::FuseSession; use mountpoint_s3::fuse::S3FuseFilesystem; @@ -586,9 +588,11 @@ fn mount(args: CliArgs) -> anyhow::Result { }; if let Some(cache_config) = cache_config { - let cache = DiskDataCache::new(path, cache_config); + let managed_cache_dir = + ManagedCacheDir::new_from_parent(path).context("failed to create cache directory")?; + let cache = DiskDataCache::new(managed_cache_dir.as_path_buf(), cache_config); let prefetcher = caching_prefetch(cache, runtime, prefetcher_config); - return create_filesystem( + let mut fuse_session = create_filesystem( client, prefetcher, &args.bucket_name, @@ -597,6 +601,14 @@ fn mount(args: CliArgs) -> anyhow::Result { fuse_config, &bucket_description, ); + + if let Ok(session) = &mut fuse_session { + session.run_on_close(Box::new(move || { + drop(managed_cache_dir); + })); + } + + return fuse_session; } } }