Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [MR-353] Asynchronously mark checkpoint readonly and sync #3682

Merged
merged 18 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 61 additions & 38 deletions rs/state_layout/src/state_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,6 @@ pub struct CanisterSnapshotBits {
struct StateLayoutMetrics {
state_layout_error_count: IntCounterVec,
state_layout_remove_checkpoint_duration: Histogram,
#[cfg(target_os = "linux")]
state_layout_syncfs_duration: Histogram,
}

impl StateLayoutMetrics {
Expand All @@ -230,12 +228,6 @@ impl StateLayoutMetrics {
"Time elapsed in removing checkpoint.",
decimal_buckets(-3, 1),
),
#[cfg(target_os = "linux")]
state_layout_syncfs_duration: metric_registry.histogram(
"state_layout_syncfs_duration_seconds",
"Time elapsed in syncfs.",
decimal_buckets(-2, 2),
),
}
}
}
Expand Down Expand Up @@ -336,7 +328,7 @@ struct CheckpointRefData {
/// 1. Create state files directly in
/// "<state_root>/fs_tmp/state_sync_scratchpad_<height>".
///
/// 2. When all the writes are complete, call sync_and_mark_files_readonly()
/// 2. When all the writes are complete, call mark_files_readonly_and_sync()
/// on "<state_root>/fs_tmp/state_sync_scratchpad_<height>". This function
/// syncs all the files and directories under the scratchpad directory,
/// including the scratchpad directory itself.
Expand Down Expand Up @@ -531,12 +523,13 @@ impl StateLayout {
) -> Result<(), LayoutError> {
for height in self.checkpoint_heights()? {
let path = self.checkpoint_verified(height)?.raw_path().to_path_buf();
sync_and_mark_files_readonly(&self.log, &path, &self.metrics, thread_pool.as_mut())
.map_err(|err| LayoutError::IoError {
mark_files_readonly_and_sync(&path, thread_pool.as_mut()).map_err(|err| {
ShuoWangNSL marked this conversation as resolved.
Show resolved Hide resolved
LayoutError::IoError {
path,
message: format!("Could not sync and mark readonly checkpoint {}", height),
io_err: err,
})?;
}
})?;
}
Ok(())
}
Expand Down Expand Up @@ -646,26 +639,31 @@ impl StateLayout {
}
}

pub fn scratchpad_to_checkpoint<T>(
/// Creates an unverified marker in the scratchpad and promotes it to a checkpoint.
///
/// This function maintains the integrity of the checkpointing process by ensuring that
/// the scratchpad is properly marked as unverified before transitioning it into a checkpoint.
pub fn promote_scratchpad_to_unverified_checkpoint<T>(
&self,
scratchpad_layout: CheckpointLayout<RwPolicy<'_, T>>,
height: Height,
) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
scratchpad_layout.create_unverified_checkpoint_marker()?;
self.scratchpad_to_checkpoint(scratchpad_layout, height)
}

fn scratchpad_to_checkpoint<T>(
&self,
layout: CheckpointLayout<RwPolicy<'_, T>>,
height: Height,
thread_pool: Option<&mut scoped_threadpool::Pool>,
) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
ShuoWangNSL marked this conversation as resolved.
Show resolved Hide resolved
// The scratchpad must have an unverified marker before it is promoted to a checkpoint.
debug_assert!(!layout.is_checkpoint_verified());
debug_assert_eq!(height, layout.height());
let scratchpad = layout.raw_path();
let checkpoints_path = self.checkpoints();
let cp_path = checkpoints_path.join(Self::checkpoint_name(height));
sync_and_mark_files_readonly(&self.log, scratchpad, &self.metrics, thread_pool).map_err(
|err| LayoutError::IoError {
path: scratchpad.to_path_buf(),
message: format!(
"Could not sync and mark readonly scratchpad for checkpoint {}",
height
),
io_err: err,
},
)?;

std::fs::rename(scratchpad, cp_path).map_err(|err| {
if is_already_exists_err(&err) {
LayoutError::AlreadyExists(height)
Expand Down Expand Up @@ -705,7 +703,10 @@ impl StateLayout {

/// Returns the layout of the checkpoint with the given height.
/// If the checkpoint is not found, an error is returned.
fn checkpoint(&self, height: Height) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
fn checkpoint<Permissions: AccessPolicy>(
&self,
height: Height,
) -> Result<CheckpointLayout<Permissions>, LayoutError> {
ShuoWangNSL marked this conversation as resolved.
Show resolved Hide resolved
let cp_name = Self::checkpoint_name(height);
let path = self.checkpoints().join(cp_name);
if !path.exists() {
Expand Down Expand Up @@ -734,7 +735,7 @@ impl StateLayout {
}
}
}
CheckpointLayout::new(path, height, self.clone())
CheckpointLayout::<Permissions>::new(path, height, self.clone())
}

/// Returns the layout of a verified checkpoint with the given height.
Expand All @@ -743,7 +744,7 @@ impl StateLayout {
&self,
height: Height,
) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
let cp = self.checkpoint(height)?;
let cp = self.checkpoint::<ReadOnly>(height)?;
if !cp.is_checkpoint_verified() {
return Err(LayoutError::CheckpointUnverified(height));
};
Expand All @@ -759,7 +760,7 @@ impl StateLayout {
&self,
height: Height,
) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
self.checkpoint(height)
self.checkpoint::<ReadOnly>(height)
}

/// Returns if a checkpoint with the given height is verified or not.
Expand Down Expand Up @@ -1553,6 +1554,22 @@ impl<Permissions: AccessPolicy> CheckpointLayout<Permissions> {
pub fn is_checkpoint_verified(&self) -> bool {
!self.unverified_checkpoint_marker().exists()
}

pub fn mark_files_readonly_and_sync(
&self,
thread_pool: Option<&mut scoped_threadpool::Pool>,
) -> Result<(), LayoutError> {
mark_files_readonly_and_sync(self.raw_path(), thread_pool).map_err(|err| {
LayoutError::IoError {
path: self.raw_path().to_path_buf(),
message: format!(
"Could not mark files readonly and sync for checkpoint {}",
self.height()
),
io_err: err,
}
})
}
}

impl<P> CheckpointLayout<P>
Expand Down Expand Up @@ -1585,7 +1602,7 @@ impl CheckpointLayout<ReadOnly> {
/// A readonly checkpoint typically prevents modification to the files in the checkpoint.
/// However, the removal of the unverified checkpoint marker is allowed as
/// the marker is not part the checkpoint conceptually.
pub fn remove_unverified_checkpoint_marker(&self) -> Result<(), LayoutError> {
fn remove_unverified_checkpoint_marker(&self) -> Result<(), LayoutError> {
let marker = self.unverified_checkpoint_marker();
if !marker.exists() {
return Ok(());
Expand All @@ -1609,6 +1626,20 @@ impl CheckpointLayout<ReadOnly> {
io_err: err,
})
}

/// Finalizes the checkpoint by marking all files as read-only, ensuring
/// they are fully synchronized to disk, and then removing the unverified checkpoint marker.
///
/// This function is necessary due to the asynchronous checkpoint writing.
/// Marking the files as read-only and performing a sync operation should be the last step
/// before removing the unverified checkpoint marker to prevent data inconsistencies.
pub fn finalize_and_remove_unverified_marker(
&self,
thread_pool: Option<&mut scoped_threadpool::Pool>,
) -> Result<(), LayoutError> {
self.mark_files_readonly_and_sync(thread_pool)?;
self.remove_unverified_checkpoint_marker()
}
}

pub struct PageMapLayout<Permissions: AccessPolicy> {
Expand Down Expand Up @@ -2723,10 +2754,8 @@ fn dir_list_recursive(path: &Path) -> std::io::Result<Vec<PathBuf>> {

/// Recursively set permissions to readonly for all files under the given
/// `path`.
fn sync_and_mark_files_readonly(
#[allow(unused)] log: &ReplicaLogger,
fn mark_files_readonly_and_sync(
path: &Path,
#[allow(unused)] metrics: &StateLayoutMetrics,
mut thread_pool: Option<&mut scoped_threadpool::Pool>,
) -> std::io::Result<()> {
let paths = dir_list_recursive(path)?;
Expand All @@ -2742,17 +2771,11 @@ fn sync_and_mark_files_readonly(
{
let f = std::fs::File::open(path)?;
use std::os::fd::AsRawFd;
let start = Instant::now();
unsafe {
if libc::syncfs(f.as_raw_fd()) == -1 {
return Err(std::io::Error::last_os_error());
}
}
let elapsed = start.elapsed();
metrics
.state_layout_syncfs_duration
.observe(elapsed.as_secs_f64());
info!(log, "syncfs took {:?}", elapsed);
}
Ok(())
}
Expand Down
32 changes: 17 additions & 15 deletions rs/state_layout/src/state_layout/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,44 +279,43 @@ fn test_removal_when_last_dropped() {
let state_layout = StateLayout::try_new(log, root_path, &metrics_registry).unwrap();
let scratchpad_dir = tmpdir("scratchpad");
let cp1 = state_layout
.scratchpad_to_checkpoint(
.promote_scratchpad_to_unverified_checkpoint(
CheckpointLayout::<RwPolicy<()>>::new_untracked(
scratchpad_dir.path().to_path_buf().join("1"),
Height::new(1),
)
.unwrap(),
Height::new(1),
None,
)
.unwrap();
cp1.finalize_and_remove_unverified_marker(None).unwrap();
let cp2 = state_layout
.scratchpad_to_checkpoint(
.promote_scratchpad_to_unverified_checkpoint(
CheckpointLayout::<RwPolicy<()>>::new_untracked(
scratchpad_dir.path().to_path_buf().join("2"),
Height::new(2),
)
.unwrap(),
Height::new(2),
None,
)
.unwrap();
cp2.finalize_and_remove_unverified_marker(None).unwrap();
// Add one checkpoint so that we never remove the last one and crash
let _cp3 = state_layout
.scratchpad_to_checkpoint(
let cp3 = state_layout
.promote_scratchpad_to_unverified_checkpoint(
CheckpointLayout::<RwPolicy<()>>::new_untracked(
scratchpad_dir.path().to_path_buf().join("3"),
Height::new(3),
)
.unwrap(),
Height::new(3),
None,
)
.unwrap();
cp3.finalize_and_remove_unverified_marker(None).unwrap();
assert_eq!(
vec![Height::new(1), Height::new(2), Height::new(3)],
state_layout.checkpoint_heights().unwrap(),
);

std::mem::drop(cp1);
state_layout.remove_checkpoint_when_unused(Height::new(1));
state_layout.remove_checkpoint_when_unused(Height::new(2));
Expand Down Expand Up @@ -344,16 +343,16 @@ fn test_last_removal_panics_in_debug() {
let state_layout = StateLayout::try_new(log, root_path, &metrics_registry).unwrap();
let scratchpad_dir = tmpdir("scratchpad");
let cp1 = state_layout
.scratchpad_to_checkpoint(
.promote_scratchpad_to_unverified_checkpoint(
CheckpointLayout::<RwPolicy<()>>::new_untracked(
scratchpad_dir.path().to_path_buf().join("1"),
Height::new(1),
)
.unwrap(),
Height::new(1),
None,
)
.unwrap();
cp1.finalize_and_remove_unverified_marker(None).unwrap();
state_layout.remove_checkpoint_when_unused(Height::new(1));
std::mem::drop(cp1);
});
Expand All @@ -376,7 +375,7 @@ fn test_can_remove_unverified_marker_file_twice() {
CheckpointLayout::<RwPolicy<()>>::new_untracked(state_sync_scratchpad, height)
.expect("failed to create checkpoint layout");
// Create at least a file in the scratchpad layout. Otherwise, empty folders can be overridden without errors
// and calling "scratchpad_to_checkpoint" twice will not fail as expected.
// and calling "promote_scratchpad_to_unverified_checkpoint" twice will not fail as expected.
File::create(scratchpad_layout.raw_path().join(SYSTEM_METADATA_FILE)).unwrap();

let tip_path = state_layout.tip_path();
Expand All @@ -391,19 +390,22 @@ fn test_can_remove_unverified_marker_file_twice() {
tip.create_unverified_checkpoint_marker().unwrap();

let checkpoint = state_layout
.scratchpad_to_checkpoint(scratchpad_layout, height, None)
.promote_scratchpad_to_unverified_checkpoint(scratchpad_layout, height)
.unwrap();
checkpoint
.finalize_and_remove_unverified_marker(None)
.unwrap();
checkpoint.remove_unverified_checkpoint_marker().unwrap();

// The checkpoint already exists, therefore promoting the tip to checkpoint should fail.
// However, it can still access the checkpoint and try to remove the marker file again from its side.
let checkpoint_result = state_layout.scratchpad_to_checkpoint(tip, height, None);
let checkpoint_result =
state_layout.promote_scratchpad_to_unverified_checkpoint(tip, height);
assert!(checkpoint_result.is_err());

let res = state_layout
.checkpoint_in_verification(height)
.unwrap()
.remove_unverified_checkpoint_marker();
.finalize_and_remove_unverified_marker(None);
assert!(res.is_ok());
});
}
Expand Down
17 changes: 9 additions & 8 deletions rs/state_manager/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub(crate) fn make_unvalidated_checkpoint(
Ok((cp, has_downgrade))
}

pub(crate) fn validate_checkpoint_and_remove_unverified_marker(
pub(crate) fn validate_and_finalize_checkpoint_and_remove_unverified_marker(
checkpoint_layout: &CheckpointLayout<ReadOnly>,
reference_state: Option<&ReplicatedState>,
own_subnet_type: SubnetType,
Expand All @@ -117,13 +117,13 @@ pub(crate) fn validate_checkpoint_and_remove_unverified_marker(
checkpoint_layout,
reference_state,
own_subnet_type,
thread_pool,
&mut thread_pool,
fd_factory,
metrics,
);
}
checkpoint_layout
.remove_unverified_checkpoint_marker()
.finalize_and_remove_unverified_marker(thread_pool)
.map_err(CheckpointError::from)?;
Ok(())
}
Expand All @@ -146,7 +146,8 @@ pub fn load_checkpoint_and_validate_parallel(
Some(&mut thread_pool),
Arc::clone(&fd_factory),
)?;
validate_checkpoint_and_remove_unverified_marker(

validate_and_finalize_checkpoint_and_remove_unverified_marker(
checkpoint_layout,
None,
own_subnet_type,
Expand Down Expand Up @@ -411,7 +412,7 @@ pub fn validate_eq_checkpoint(
checkpoint_layout: &CheckpointLayout<ReadOnly>,
reference_state: &ReplicatedState,
own_subnet_type: SubnetType,
thread_pool: Option<&mut scoped_threadpool::Pool>,
thread_pool: &mut Option<&mut scoped_threadpool::Pool>,
fd_factory: Arc<dyn PageAllocatorFileDescriptor>, //
metrics: &CheckpointMetrics, // Make optional in the loader & don't provide?
) {
Expand All @@ -438,7 +439,7 @@ fn validate_eq_checkpoint_internal(
checkpoint_layout: &CheckpointLayout<ReadOnly>,
reference_state: &ReplicatedState,
own_subnet_type: SubnetType,
mut thread_pool: Option<&mut scoped_threadpool::Pool>,
thread_pool: &mut Option<&mut scoped_threadpool::Pool>,
fd_factory: Arc<dyn PageAllocatorFileDescriptor>, //
metrics: &CheckpointMetrics, // Make optional in the loader & don't provide?
) -> Result<(), String> {
Expand All @@ -458,7 +459,7 @@ fn validate_eq_checkpoint_internal(
fd_factory,
};

checkpoint_loader.validate_eq_canister_states(&mut thread_pool, canister_states)?;
checkpoint_loader.validate_eq_canister_states(thread_pool, canister_states)?;
checkpoint_loader
.load_system_metadata()
.map_err(|err| format!("Failed to load system metadata: {}", err))?
Expand All @@ -473,7 +474,7 @@ fn validate_eq_checkpoint_internal(
if !consensus_queue.is_empty() {
return Err("consensus_queue is not empty".to_string());
}
checkpoint_loader.validate_eq_canister_snapshots(&mut thread_pool, canister_snapshots)
checkpoint_loader.validate_eq_canister_snapshots(thread_pool, canister_snapshots)
}

#[derive(Default)]
Expand Down
Loading