Skip to content

Commit

Permalink
Fix a race condition on create and forget operations (#711)
Browse files Browse the repository at this point in the history
* Fix a race condition on create and forget operations

Signed-off-by: Monthon Klongklaew <[email protected]>

* PR comments

Signed-off-by: Monthon Klongklaew <[email protected]>

* Don't panic when cannot remove inode from superblock

Signed-off-by: Monthon Klongklaew <[email protected]>

* PR comments

Signed-off-by: Monthon Klongklaew <[email protected]>

---------

Signed-off-by: Monthon Klongklaew <[email protected]>
  • Loading branch information
monthonk authored Jan 26, 2024
1 parent 85c98fa commit 25aff50
Showing 1 changed file with 134 additions and 73 deletions.
207 changes: 134 additions & 73 deletions mountpoint-s3/src/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,54 +131,62 @@ impl Superblock {
/// The kernel may forget a number of references (`n`) in one forget message to our FUSE implementation.
/// If the lookup count reaches zero, it is safe for the [Superblock] to delete the [Inode].
pub fn forget(&self, ino: InodeNo, n: u64) {
let mut inodes = self.inner.inodes.write().unwrap();
match inodes.get(&ino) {
None => {
let inode = {
if let Some(inode) = self.inner.inodes.read().unwrap().get(&ino).cloned() {
inode
} else {
debug_assert!(
false,
"forget should not be called on inode already removed from superblock"
);
error!("forget called on inode {ino} already removed from the superblock");
return;
}
Some(inode) => {
logging::record_name(inode.name());
let new_lookup_count = inode.dec_lookup_count(n);
if new_lookup_count == 0 {
// Safe to remove, kernel no longer has a reference to it.
trace!(ino, "removing inode from superblock");
let inode = inodes.remove(&ino).expect("we just retrieved it");
};

logging::record_name(inode.name());
let new_lookup_count = inode.dec_lookup_count(n);
if new_lookup_count == 0 {
// Safe to remove, kernel no longer has a reference to it.
trace!(ino, "removing inode from superblock");
let Some(inode) = self.inner.inodes.write().unwrap().remove(&ino) else {
error!("forget called on inode {ino} already removed from the superblock");
return;
};

let parent = {
if let Some(parent) = self.inner.inodes.read().unwrap().get(&inode.parent()).cloned() {
parent
} else {
// Should be impossible for this to fail (VFS inodes reference their parent, so
// children need to be freed first), but let's not crash in a `forget` function...
let Some(parent) = inodes.get(&inode.parent()) else {
debug_assert!(false, "children should be forgotten before parents");
return;
};
let mut parent_state = parent.inner.sync.write().unwrap();
let InodeKindData::Directory {
children,
writing_children,
..
} = &mut parent_state.kind_data
else {
unreachable!("parent is always a directory");
};
if let Some(child) = children.get(inode.name()) {
// Don't accidentally remove a newer inode (e.g. remote shadowing local)
if child.ino() == ino {
children.remove(inode.name());
}
}
writing_children.remove(&ino);

if let Ok(state) = inode.get_inode_state() {
metrics::counter!(
"metadata_cache.inode_forgotten_before_expiry",
state.stat.is_valid().into(),
);
};
debug_assert!(false, "children should be forgotten before parents");
return;
}
};
let mut parent_state = parent.inner.sync.write().unwrap();
let InodeKindData::Directory {
children,
writing_children,
..
} = &mut parent_state.kind_data
else {
unreachable!("parent is always a directory");
};
if let Some(child) = children.get(inode.name()) {
// Don't accidentally remove a newer inode (e.g. remote shadowing local)
if child.ino() == ino {
children.remove(inode.name());
}
}
writing_children.remove(&ino);

if let Ok(state) = inode.get_inode_state() {
metrics::counter!(
"metadata_cache.inode_forgotten_before_expiry",
state.stat.is_valid().into(),
);
};
}
}

Expand Down Expand Up @@ -343,47 +351,51 @@ impl Superblock {
.to_str()
.ok_or_else(|| InodeError::InvalidFileName(name.to_owned()))?;

let parent_inode = self.inner.get(dir)?;
let mut parent_state = parent_inode.get_mut_inode_state()?;
// Put inode creation in a block so we don't hold the lock on the parent state longer than needed.
let lookup = {
let parent_inode = self.inner.get(dir)?;
let mut parent_state = parent_inode.get_mut_inode_state()?;

// Check again for the child now that the parent is locked, since we might have lost to a
// racing lookup. (It would be nice to lock the parent and *then* lookup, but we'd have to
// hold that lock across the remote API calls).
let InodeKindData::Directory { children, .. } = &mut parent_state.kind_data else {
return Err(InodeError::NotADirectory(parent_inode.err()));
};
if let Some(inode) = children.get(name) {
return Err(InodeError::FileAlreadyExists(inode.err()));
}

let stat = match kind {
// Objects don't have an ETag until they are uploaded to S3
InodeKind::File => InodeStat::for_file(
0,
OffsetDateTime::now_utc(),
None,
None,
None,
self.inner.config.cache_config.file_ttl,
),
InodeKind::Directory => {
InodeStat::for_directory(self.inner.mount_time, self.inner.config.cache_config.dir_ttl)
// Check again for the child now that the parent is locked, since we might have lost to a
// racing lookup. (It would be nice to lock the parent and *then* lookup, but we'd have to
// hold that lock across the remote API calls).
let InodeKindData::Directory { children, .. } = &mut parent_state.kind_data else {
return Err(InodeError::NotADirectory(parent_inode.err()));
};
if let Some(inode) = children.get(name) {
return Err(InodeError::FileAlreadyExists(inode.err()));
}
};

let state = InodeState {
stat: stat.clone(),
kind_data: InodeKindData::default_for(kind),
write_status: WriteStatus::LocalUnopened,
lookup_count: 0,
reader_count: 0,
let stat = match kind {
// Objects don't have an ETag until they are uploaded to S3
InodeKind::File => InodeStat::for_file(
0,
OffsetDateTime::now_utc(),
None,
None,
None,
self.inner.config.cache_config.file_ttl,
),
InodeKind::Directory => {
InodeStat::for_directory(self.inner.mount_time, self.inner.config.cache_config.dir_ttl)
}
};

let state = InodeState {
stat: stat.clone(),
kind_data: InodeKindData::default_for(kind),
write_status: WriteStatus::LocalUnopened,
lookup_count: 0,
reader_count: 0,
};
let inode = self
.inner
.create_inode_locked(&parent_inode, &mut parent_state, name, kind, state, true)?;
LookedUp { inode, stat }
};
let inode = self
.inner
.create_inode_locked(&parent_inode, &mut parent_state, name, kind, state, true)?;

self.inner.remember(&inode);
Ok(LookedUp { inode, stat })
self.inner.remember(&lookup.inode);
Ok(lookup)
}

/// Remove local-only empty directory, i.e., the ones created by mkdir.
Expand Down Expand Up @@ -2797,4 +2809,53 @@ mod tests {
assert_eq!(file_inodestat.ctime, ts);
assert_eq!(file_inodestat.mtime, ts);
}

#[cfg(feature = "shuttle")]
mod shuttle_tests {
use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig};
use shuttle::{check_pct, check_random, thread};
use shuttle::{future::block_on, sync::Arc};

use super::*;

#[test]
fn test_create_and_forget_race_condition() {
async fn test_helper() {
let client_config = MockClientConfig {
bucket: "test_bucket".to_string(),
part_size: 1024 * 1024,
..Default::default()
};
let client = Arc::new(MockClient::new(client_config));

let name = "foo";
client.add_object(name, b"foo".into());

let superblock = Arc::new(Superblock::new("test_bucket", &Default::default(), Default::default()));

let lookup = superblock.lookup(&client, ROOT_INODE_NO, name.as_ref()).await.unwrap();
let lookup_count = lookup.inode.inner.sync.read().unwrap().lookup_count;
assert_eq!(lookup_count, 1);
let ino = lookup.inode.ino();

let superblock_clone = superblock.clone();
let forget_task = thread::spawn(move || {
superblock_clone.forget(ino, 1);
});

let file_name = "bar";
superblock
.create(&client, ROOT_INODE_NO, file_name.as_ref(), InodeKind::File)
.await
.unwrap();

forget_task.join().unwrap();
let lookup_count = lookup.inode.inner.sync.read().unwrap().lookup_count;
assert_eq!(lookup_count, 0);
}

check_random(|| block_on(test_helper()), 1000);
check_pct(|| block_on(test_helper()), 1000, 3);
}
}
}

0 comments on commit 25aff50

Please sign in to comment.