Skip to content

Commit

Permalink
Introduce ExpirableSet
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro committed Jan 15, 2024
1 parent 4fe2afe commit 311de65
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 41 deletions.
3 changes: 3 additions & 0 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ pub struct CacheConfig {
pub file_ttl: Duration,
/// How long the kernel will cache metadata for directories
pub dir_ttl: Duration,
/// Maximum number of negative entries to cache.
pub negative_cache_size: usize,
}

impl Default for CacheConfig {
Expand All @@ -310,6 +312,7 @@ impl Default for CacheConfig {
serve_lookup_from_cache: false,
file_ttl,
dir_ttl,
negative_cache_size: 100_000,
}
}
}
Expand Down
170 changes: 137 additions & 33 deletions mountpoint-s3/src/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ use crate::sync::{Arc, RwLock};
mod expiry;
use expiry::Expiry;

mod expirable_set;
use expirable_set::ExpirableSet;

mod readdir;
pub use readdir::ReaddirHandle;

Expand Down Expand Up @@ -83,6 +86,7 @@ pub struct Superblock {
struct SuperblockInner {
bucket: String,
inodes: RwLock<HashMap<InodeNo, Inode>>,
known_missing_keys: RwLock<ExpirableSet>,
next_ino: AtomicU64,
mount_time: OffsetDateTime,
config: SuperblockConfig,
Expand Down Expand Up @@ -119,9 +123,12 @@ impl Superblock {
let mut inodes = HashMap::new();
inodes.insert(ROOT_INODE_NO, root);

let cache = ExpirableSet::new(config.cache_config.negative_cache_size, config.cache_config.file_ttl);

let inner = SuperblockInner {
bucket: bucket.to_owned(),
inodes: RwLock::new(inodes),
known_missing_keys: RwLock::new(cache),
next_ino: AtomicU64::new(2),
mount_time,
config,
Expand Down Expand Up @@ -598,7 +605,7 @@ impl SuperblockInner {
};

let lookup = match lookup {
Some(lookup) => lookup,
Some(lookup) => lookup?,
None => {
let remote = self.remote_lookup(client, parent_ino, name).await?;
self.update_from_remote(parent_ino, name, remote)?
Expand All @@ -611,30 +618,44 @@ impl SuperblockInner {

/// Lookup an [Inode] against known directory entries in the parent,
/// verifying any returned entry has not expired.
fn cache_lookup(&self, parent_ino: InodeNo, name: &str) -> Option<LookedUp> {
fn do_cache_lookup(parent: Inode, name: &str) -> Option<LookedUp> {
fn cache_lookup(&self, parent_ino: InodeNo, name: &str) -> Option<Result<LookedUp, InodeError>> {
fn do_cache_lookup(
superblock: &SuperblockInner,
parent: Inode,
name: &str,
) -> Option<Result<LookedUp, InodeError>> {
match &parent.get_inode_state().ok()?.kind_data {
InodeKindData::File { .. } => unreachable!("parent should be a directory!"),
InodeKindData::Directory { children, .. } => {
let inode = children.get(name)?;
let inode_stat = &inode.get_inode_state().ok()?.stat;
if inode_stat.is_valid() {
let lookup = LookedUp {
inode: inode.clone(),
stat: inode_stat.clone(),
};
return Some(lookup);
if let Some(inode) = children.get(name) {
let inode_stat = &inode.get_inode_state().ok()?.stat;
if inode_stat.is_valid() {
let lookup = LookedUp {
inode: inode.clone(),
stat: inode_stat.clone(),
};
return Some(Ok(lookup));
}
}
}
};

if superblock
.known_missing_keys
.read()
.unwrap()
.contains(parent.ino(), name)
{
return Some(Err(InodeError::FileDoesNotExist(name.to_owned(), parent.err())));
}

None
}

let lookup = self
.get(parent_ino)
.ok()
.and_then(|parent| do_cache_lookup(parent, name));
.and_then(|parent| do_cache_lookup(self, parent, name));

match &lookup {
Some(lookup) => trace!("lookup returned from cache: {:?}", lookup),
Expand Down Expand Up @@ -789,7 +810,7 @@ impl SuperblockInner {
}

// Fast path: try with only a read lock on the directory first.
if let Some(looked_up) = Self::try_update_fast_path(&parent, name, &remote)? {
if let Some(looked_up) = self.try_update_fast_path(&parent, name, &remote)? {
return Ok(looked_up);
}

Expand All @@ -799,6 +820,7 @@ impl SuperblockInner {
/// Try to update the inode for the given name in the parent directory with only a read lock on
/// the parent.
fn try_update_fast_path(
&self,
parent: &Inode,
name: &str,
remote: &Option<RemoteLookup>,
Expand All @@ -809,7 +831,13 @@ impl SuperblockInner {
InodeKindData::Directory { children, .. } => children.get(name),
};
match (remote, inode) {
(None, None) => Err(InodeError::FileDoesNotExist(name.to_owned(), parent.err())),
(None, None) => {
if self.config.cache_config.serve_lookup_from_cache {
// Insert or update validity of negative cache entry.
self.known_missing_keys.write().unwrap().insert(parent.ino(), name);
}
Err(InodeError::FileDoesNotExist(name.to_owned(), parent.err()))
}
(Some(remote), Some(existing_inode)) => {
let mut existing_state = existing_inode.get_mut_inode_state()?;
let existing_is_remote = existing_state.write_status == WriteStatus::Remote;
Expand Down Expand Up @@ -846,7 +874,13 @@ impl SuperblockInner {
InodeKindData::Directory { children, .. } => children.get(name).cloned(),
};
match (remote, inode) {
(None, None) => Err(InodeError::FileDoesNotExist(name.to_owned(), parent.err())),
(None, None) => {
if self.config.cache_config.serve_lookup_from_cache {
// Insert or update validity of negative cache entry.
self.known_missing_keys.write().unwrap().insert(parent.ino(), name);
}
Err(InodeError::FileDoesNotExist(name.to_owned(), parent.err()))
}
(None, Some(existing_inode)) => {
let InodeKindData::Directory {
children,
Expand Down Expand Up @@ -1016,6 +1050,8 @@ impl SuperblockInner {
}
if let Some(existing_inode) = existing_inode {
writing_children.remove(&existing_inode.ino());
} else {
self.known_missing_keys.write().unwrap().remove(parent.ino(), name);
}
}
}
Expand Down Expand Up @@ -1672,9 +1708,9 @@ mod tests {
let client = Arc::new(MockClient::new(client_config));

let keys = &[
format!("{prefix}dir0/file0.txt"),
format!("{prefix}dir0/sdir0/file0.txt"),
format!("{prefix}dir0/sdir0/file1.txt"),
format!("{prefix}file0.txt"),
format!("{prefix}sdir0/file0.txt"),
format!("{prefix}sdir0/file1.txt"),
];

let object_size = 30;
Expand All @@ -1700,29 +1736,97 @@ mod tests {
serve_lookup_from_cache: true,
dir_ttl: ttl,
file_ttl: ttl,
..Default::default()
},
s3_personality: S3Personality::Standard,
},
);

let dir0 = superblock
.lookup(&client, FUSE_ROOT_INODE, &OsString::from("dir0"))
.await
.expect("should exist");
let file0 = superblock
.lookup(&client, dir0.inode.ino(), &OsString::from("file0.txt"))
.await
.expect("should exist");
let entries = ["file0.txt", "sdir0"];
for entry in entries {
_ = superblock
.lookup(&client, FUSE_ROOT_INODE, entry.as_ref())
.await
.expect("should exist");
}

client.remove_object(file0.inode.full_key());
for key in keys {
client.remove_object(key);
}

let file0 = superblock
.lookup(&client, dir0.inode.ino(), &OsString::from("file0.txt"))
.await;
if cached {
file0.expect("file0 inode should still be served from cache");
for entry in entries {
let lookup = superblock.lookup(&client, FUSE_ROOT_INODE, entry.as_ref()).await;
if cached {
lookup.expect("inode should still be served from cache");
} else {
lookup.expect_err("entry should have expired, and not be found in S3");
}
}
}

#[test_case(true; "cached")]
#[test_case(false; "not cached")]
#[tokio::test]
async fn test_negative_lookup_with_caching(cached: bool) {
let bucket = "test_bucket";
let prefix = "prefix/";
let client_config = MockClientConfig {
bucket: bucket.to_string(),
part_size: 1024 * 1024,
..Default::default()
};
let client = Arc::new(MockClient::new(client_config));

let prefix = Prefix::new(prefix).expect("valid prefix");
let ttl = if cached {
std::time::Duration::from_secs(60 * 60 * 24 * 7) // 7 days should be enough
} else {
file0.expect_err("file0 entry should have expired, and not be found in S3");
std::time::Duration::ZERO
};
let superblock = Superblock::new(
bucket,
&prefix,
SuperblockConfig {
cache_config: CacheConfig {
serve_lookup_from_cache: true,
dir_ttl: ttl,
file_ttl: ttl,
..Default::default()
},
s3_personality: S3Personality::Standard,
},
);

let entries = ["file0.txt", "sdir0"];
for entry in entries {
_ = superblock
.lookup(&client, FUSE_ROOT_INODE, entry.as_ref())
.await
.expect_err("should not exist");
}

let keys = &[
format!("{prefix}file0.txt"),
format!("{prefix}sdir0/file0.txt"),
format!("{prefix}sdir0/file1.txt"),
];

let object_size = 30;
let mut last_modified = OffsetDateTime::UNIX_EPOCH;
for key in keys {
let mut obj = MockObject::constant(0xaa, object_size, ETag::for_tests());
last_modified += Duration::days(1);
obj.set_last_modified(last_modified);
client.add_object(key, obj);
}

for entry in entries {
let lookup = superblock.lookup(&client, FUSE_ROOT_INODE, entry.as_ref()).await;
if cached {
lookup.expect_err("negative entry should still be valid in the cache, so the new key should not have been looked up in S3");
} else {
lookup.expect("new object should have been looked up in S3");
}
}
}

Expand Down
70 changes: 70 additions & 0 deletions mountpoint-s3/src/inode/expirable_set.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::time::Duration;

use linked_hash_map::LinkedHashMap;

use super::{expiry::Expiry, InodeNo};

/// A bounded set of (parent_ino, child_name) entries that expire after a fixed time.
#[derive(Debug)]
pub struct ExpirableSet {
map: LinkedHashMap<Key, Expiry>,
size: usize,
max_size: usize,
validity: Duration,
}

#[derive(Debug, Hash, PartialEq, Eq)]
struct Key {
parent_ino: InodeNo,
child_name: String,
}

impl ExpirableSet {
pub fn new(max_size: usize, validity: Duration) -> Self {
Self {
map: Default::default(),
size: 0,
max_size,
validity,
}
}

pub fn contains(&self, parent_ino: InodeNo, child_name: &str) -> bool {
let key = Key {
parent_ino,
child_name: child_name.to_owned(),
};
let Some(value) = self.map.get(&key) else {
return false;
};
value.is_valid()
}

pub fn remove(&mut self, parent_ino: InodeNo, child_name: &str) {
let key = Key {
parent_ino,
child_name: child_name.to_owned(),
};
if self.map.remove(&key).is_some() {
self.size -= 1;
}
}

pub fn insert(&mut self, parent_ino: InodeNo, child_name: &str) {
let expiry = Expiry::from_now(self.validity);
let key = Key {
parent_ino,
child_name: child_name.to_owned(),
};
if self.map.insert(key, expiry).is_none() {
self.size += 1;

while self.size > self.max_size || self.map.front().is_some_and(|(_, e)| !e.is_valid()) {
if self.map.pop_front().is_none() {
break;
}
self.size -= 1;
}
}
}
}
1 change: 1 addition & 0 deletions mountpoint-s3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ fn mount(args: CliArgs) -> anyhow::Result<FuseSession> {
serve_lookup_from_cache: true,
dir_ttl: metadata_cache_ttl,
file_ttl: metadata_cache_ttl,
..Default::default()
};

let cache_config = match args.max_cache_size {
Expand Down
1 change: 1 addition & 0 deletions mountpoint-s3/tests/direct_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ where
serve_lookup_from_cache: true,
dir_ttl: Duration::from_secs(600),
file_ttl: Duration::from_secs(600),
..Default::default()
},
..Default::default()
},
Expand Down
Loading

0 comments on commit 311de65

Please sign in to comment.