Skip to content

Commit

Permalink
Ensure that the file system in tests is unmounted before removing the…
Browse files Browse the repository at this point in the history
… temp dir (#1116)

## Description of change

Ensure that the file system in integration tests is unmounted before
removing the temporary directory. The introduction of `TestSession` in
#1096 inadvertently changed the order in which the temporary directory
and the FUSE session are dropped. Previously it was hidden in the
declaration order. This change makes it explicit in `drop`.

## Does this change impact existing behavior?

No.

## Does this change need a changelog entry in any of the crates?

No.

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro authored Nov 7, 2024
1 parent 9d48a72 commit e354067
Show file tree
Hide file tree
Showing 15 changed files with 322 additions and 349 deletions.
102 changes: 53 additions & 49 deletions mountpoint-s3/tests/common/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ use tempfile::TempDir;
use crate::common::{get_crt_client_auth_config, tokio_block_on};

pub trait TestClient: Send {
fn put_object(&mut self, key: &str, value: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
fn put_object(&self, key: &str, value: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
self.put_object_params(key, value, PutObjectParams::default())
}

fn put_object_params(
&mut self,
&self,
key: &str,
value: &[u8],
params: PutObjectParams,
) -> Result<(), Box<dyn std::error::Error>>;

fn remove_object(&mut self, key: &str) -> Result<(), Box<dyn std::error::Error>>;
fn remove_object(&self, key: &str) -> Result<(), Box<dyn std::error::Error>>;

fn contains_dir(&self, key: &str) -> Result<bool, Box<dyn std::error::Error>>;

Expand All @@ -40,13 +40,11 @@ pub trait TestClient: Send {

fn get_object_parts(&self, key: &str) -> Result<Option<Vec<ObjectPart>>, Box<dyn std::error::Error>>;

fn restore_object(&mut self, key: &str, expedited: bool) -> Result<(), Box<dyn std::error::Error>>;
fn restore_object(&self, key: &str, expedited: bool) -> Result<(), Box<dyn std::error::Error>>;

fn is_object_restored(&mut self, key: &str) -> Result<bool, Box<dyn std::error::Error>>;
fn is_object_restored(&self, key: &str) -> Result<bool, Box<dyn std::error::Error>>;
}

pub type TestClientBox = Box<dyn TestClient>;

pub struct TestSessionConfig {
pub part_size: usize,
pub initial_read_window_size: usize,
Expand Down Expand Up @@ -77,9 +75,35 @@ impl TestSessionConfig {

// Holds resources for the testing session and cleans them on drop.
pub struct TestSession {
pub mount_dir: TempDir,
pub session: BackgroundSession,
pub test_client: TestClientBox,
mount_dir: TempDir,
test_client: Box<dyn TestClient>,
// Option so we can explicitly unmount
session: Option<BackgroundSession>,
}

impl TestSession {
pub fn new(mount_dir: TempDir, session: BackgroundSession, test_client: impl TestClient + 'static) -> Self {
Self {
mount_dir,
test_client: Box::new(test_client),
session: Some(session),
}
}

pub fn mount_path(&self) -> &Path {
self.mount_dir.path()
}

pub fn client(&self) -> &dyn TestClient {
self.test_client.as_ref()
}
}

impl Drop for TestSession {
fn drop(&mut self) {
// Explicitly unmount so we know the background thread is gone
self.session.take().unwrap().join();
}
}

pub trait TestSessionCreator: FnOnce(&str, TestSessionConfig) -> TestSession {}
Expand Down Expand Up @@ -165,11 +189,7 @@ pub mod mock_session {
);
let test_client = create_test_client(client, &prefix);

TestSession {
mount_dir,
session,
test_client,
}
TestSession::new(mount_dir, session, test_client)
}

/// Create a FUSE mount backed by a mock object client, with caching, that does not talk to S3
Expand Down Expand Up @@ -206,21 +226,15 @@ pub mod mock_session {
);
let test_client = create_test_client(client, &prefix);

TestSession {
mount_dir,
session,
test_client,
}
TestSession::new(mount_dir, session, test_client)
}
}

fn create_test_client(client: Arc<MockClient>, prefix: &str) -> TestClientBox {
let test_client = MockTestClient {
fn create_test_client(client: Arc<MockClient>, prefix: &str) -> impl TestClient {
MockTestClient {
prefix: prefix.to_owned(),
client,
};

Box::new(test_client)
}
}

struct MockTestClient {
Expand All @@ -230,7 +244,7 @@ pub mod mock_session {

impl TestClient for MockTestClient {
fn put_object_params(
&mut self,
&self,
key: &str,
value: &[u8],
params: PutObjectParams,
Expand All @@ -242,7 +256,7 @@ pub mod mock_session {
Ok(())
}

fn remove_object(&mut self, key: &str) -> Result<(), Box<dyn std::error::Error>> {
fn remove_object(&self, key: &str) -> Result<(), Box<dyn std::error::Error>> {
let full_key = format!("{}{}", self.prefix, key);
self.client.remove_object(&full_key);
Ok(())
Expand Down Expand Up @@ -280,12 +294,12 @@ pub mod mock_session {
Ok(attrs.object_parts.and_then(|parts| parts.parts))
}

fn restore_object(&mut self, key: &str, _expedited: bool) -> Result<(), Box<dyn std::error::Error>> {
fn restore_object(&self, key: &str, _expedited: bool) -> Result<(), Box<dyn std::error::Error>> {
let full_key = format!("{}{}", self.prefix, key);
Ok(self.client.restore_object(&full_key)?)
}

fn is_object_restored(&mut self, key: &str) -> Result<bool, Box<dyn std::error::Error>> {
fn is_object_restored(&self, key: &str) -> Result<bool, Box<dyn std::error::Error>> {
let full_key = format!("{}{}", self.prefix, key);
Ok(self.client.is_object_restored(&full_key)?)
}
Expand Down Expand Up @@ -335,11 +349,7 @@ pub mod s3_session {
);
let test_client = create_test_client(&region, &bucket, &prefix);

TestSession {
mount_dir,
session,
test_client,
}
TestSession::new(mount_dir, session, test_client)
}

/// Create a FUSE mount backed by a real S3 client, with caching
Expand Down Expand Up @@ -371,23 +381,17 @@ pub mod s3_session {
);
let test_client = create_test_client(&region, &bucket, &prefix);

TestSession {
mount_dir,
session,
test_client,
}
TestSession::new(mount_dir, session, test_client)
}
}

fn create_test_client(region: &str, bucket: &str, prefix: &str) -> TestClientBox {
fn create_test_client(region: &str, bucket: &str, prefix: &str) -> impl TestClient {
let sdk_client = tokio_block_on(async { get_test_sdk_client(region).await });
let test_client = SDKTestClient {
SDKTestClient {
prefix: prefix.to_owned(),
bucket: bucket.to_owned(),
sdk_client,
};

Box::new(test_client)
}
}

struct SDKTestClient {
Expand All @@ -398,7 +402,7 @@ pub mod s3_session {

impl TestClient for SDKTestClient {
fn put_object_params(
&mut self,
&self,
key: &str,
value: &[u8],
params: PutObjectParams,
Expand All @@ -420,7 +424,7 @@ pub mod s3_session {
Ok(tokio_block_on(request.send()).map(|_| ())?)
}

fn remove_object(&mut self, key: &str) -> Result<(), Box<dyn std::error::Error>> {
fn remove_object(&self, key: &str) -> Result<(), Box<dyn std::error::Error>> {
let full_key = format!("{}{}", self.prefix, key);
let request = self
.sdk_client
Expand Down Expand Up @@ -512,9 +516,9 @@ pub mod s3_session {
Ok(Some(parts))
}

// Schudule restoration of an object, do not wait until completion. Expidited restoration completes within 1-5 min for GLACIER and is not available for DEEP_ARCHIVE.
// Schedule restoration of an object, do not wait until completion. Expidited restoration completes within 1-5 min for GLACIER and is not available for DEEP_ARCHIVE.
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/restoring-objects-retrieval-options.html?icmpid=docs_amazons3_console#restoring-objects-upgrade-tier
fn restore_object(&mut self, key: &str, expedited: bool) -> Result<(), Box<dyn std::error::Error>> {
fn restore_object(&self, key: &str, expedited: bool) -> Result<(), Box<dyn std::error::Error>> {
let full_key = format!("{}{}", self.prefix, key);
let tier = if expedited { Tier::Expedited } else { Tier::Bulk };
let request = self
Expand All @@ -532,7 +536,7 @@ pub mod s3_session {
Ok(tokio_block_on(request).map(|_| ())?)
}

fn is_object_restored(&mut self, key: &str) -> Result<bool, Box<dyn std::error::Error>> {
fn is_object_restored(&self, key: &str) -> Result<bool, Box<dyn std::error::Error>> {
let full_key = format!("{}{}", self.prefix, key);
let head_object = tokio_block_on(self.sdk_client.head_object().bucket(&self.bucket).key(full_key).send())?;
Ok(head_object.restore().unwrap().contains("ongoing-request=\"false\""))
Expand Down
12 changes: 5 additions & 7 deletions mountpoint-s3/tests/direct_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,25 @@ fn cache_and_direct_io_test(creator_fn: impl TestSessionCreator, prefix: &str) {
..Default::default()
};
let test_session = creator_fn(prefix, test_session_conf);
let mount_point = test_session.mount_dir;
let mut test_client = test_session.test_client;

let file_name = "file.bin";

// Create the first version of the file
let old_contents = vec![0x0fu8; OBJECT_SIZE];
test_client.put_object(file_name, &old_contents).unwrap();
test_session.client().put_object(file_name, &old_contents).unwrap();

// Open and read fully the file before updating it remotely
let old_file = File::open(mount_point.path().join(file_name)).unwrap();
let old_file = File::open(test_session.mount_path().join(file_name)).unwrap();
let mut buf = vec![0u8; OBJECT_SIZE];
old_file.read_exact_at(&mut buf, 0).unwrap();
assert_eq!(buf, &old_contents[..buf.len()]);

let new_contents = vec![0xffu8; OBJECT_SIZE];
test_client.put_object(file_name, &new_contents).unwrap();
test_session.client().put_object(file_name, &new_contents).unwrap();

// Open the file again, which should be reading from cache
for _ in 0..2 {
let new_file = File::open(mount_point.path().join(file_name)).unwrap();
let new_file = File::open(test_session.mount_path().join(file_name)).unwrap();
new_file
.read_exact_at(&mut buf, 0)
.expect("should be OK as result is cached");
Expand All @@ -69,7 +67,7 @@ fn cache_and_direct_io_test(creator_fn: impl TestSessionCreator, prefix: &str) {
let new_file = OpenOptions::new()
.read(true)
.custom_flags(libc::O_DIRECT)
.open(mount_point.path().join(file_name))
.open(test_session.mount_path().join(file_name))
.unwrap();
new_file
.read_exact_at(&mut buf, 0)
Expand Down
10 changes: 4 additions & 6 deletions mountpoint-s3/tests/fuse_tests/consistency_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,22 @@ fn page_cache_sharing_test(creator_fn: impl TestSessionCreator, prefix: &str) {
const OBJECT_SIZE: usize = 512 * 1024;

let test_session = creator_fn(prefix, Default::default());
let mount_point = test_session.mount_dir;
let mut test_client = test_session.test_client;

// Create the first version of the file
let old_contents = vec![0xaau8; OBJECT_SIZE];
test_client.put_object("file.bin", &old_contents).unwrap();
test_session.client().put_object("file.bin", &old_contents).unwrap();

// Open the file before updating it remotely
let old_file = File::open(mount_point.path().join("file.bin")).unwrap();
let old_file = File::open(test_session.mount_path().join("file.bin")).unwrap();
let mut buf = vec![0u8; 128];
old_file.read_exact_at(&mut buf, 0).unwrap();
assert_eq!(buf, &old_contents[..buf.len()]);

let new_contents = vec![0xbbu8; OBJECT_SIZE];
test_client.put_object("file.bin", &new_contents).unwrap();
test_session.client().put_object("file.bin", &new_contents).unwrap();

// Open the file again, should see the new contents this time
let new_file = File::open(mount_point.path().join("file.bin")).unwrap();
let new_file = File::open(test_session.mount_path().join("file.bin")).unwrap();
new_file.read_exact_at(&mut buf, 0).unwrap();
assert_eq!(buf, &new_contents[..buf.len()]);

Expand Down
Loading

0 comments on commit e354067

Please sign in to comment.