Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [MR-353] Asynchronously mark checkpoint readonly and sync #3682

Merged
merged 18 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 45 additions & 28 deletions rs/state_layout/src/state_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ struct CheckpointRefData {
/// 1. Create state files directly in
/// "<state_root>/fs_tmp/state_sync_scratchpad_<height>".
///
/// 2. When all the writes are complete, call sync_and_mark_files_readonly()
/// 2. When all the writes are complete, call mark_files_readonly_and_sync()
/// on "<state_root>/fs_tmp/state_sync_scratchpad_<height>". This function
/// syncs all the files and directories under the scratchpad directory,
/// including the scratchpad directory itself.
Expand Down Expand Up @@ -545,12 +545,13 @@ impl StateLayout {
) -> Result<(), LayoutError> {
for height in self.checkpoint_heights()? {
let path = self.checkpoint_verified(height)?.raw_path().to_path_buf();
sync_and_mark_files_readonly(&self.log, &path, &self.metrics, thread_pool.as_mut())
.map_err(|err| LayoutError::IoError {
mark_files_readonly_and_sync(&path, thread_pool.as_mut()).map_err(|err| {
ShuoWangNSL marked this conversation as resolved.
Show resolved Hide resolved
LayoutError::IoError {
path,
message: format!("Could not sync and mark readonly checkpoint {}", height),
io_err: err,
})?;
}
})?;
}
Ok(())
}
Expand Down Expand Up @@ -664,22 +665,12 @@ impl StateLayout {
&self,
layout: CheckpointLayout<RwPolicy<'_, T>>,
height: Height,
thread_pool: Option<&mut scoped_threadpool::Pool>,
) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
ShuoWangNSL marked this conversation as resolved.
Show resolved Hide resolved
debug_assert_eq!(height, layout.height());
let scratchpad = layout.raw_path();
let checkpoints_path = self.checkpoints();
let cp_path = checkpoints_path.join(Self::checkpoint_name(height));
sync_and_mark_files_readonly(&self.log, scratchpad, &self.metrics, thread_pool).map_err(
|err| LayoutError::IoError {
path: scratchpad.to_path_buf(),
message: format!(
"Could not sync and mark readonly scratchpad for checkpoint {}",
height
),
io_err: err,
},
)?;

std::fs::rename(scratchpad, cp_path).map_err(|err| {
if is_already_exists_err(&err) {
LayoutError::AlreadyExists(height)
Expand Down Expand Up @@ -719,7 +710,10 @@ impl StateLayout {

/// Returns the layout of the checkpoint with the given height.
/// If the checkpoint is not found, an error is returned.
fn checkpoint(&self, height: Height) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
fn checkpoint<Permissions: AccessPolicy>(
&self,
height: Height,
) -> Result<CheckpointLayout<Permissions>, LayoutError> {
ShuoWangNSL marked this conversation as resolved.
Show resolved Hide resolved
let cp_name = Self::checkpoint_name(height);
let path = self.checkpoints().join(cp_name);
if !path.exists() {
Expand Down Expand Up @@ -748,7 +742,7 @@ impl StateLayout {
}
}
}
CheckpointLayout::new(path, height, self.clone())
CheckpointLayout::<Permissions>::new(path, height, self.clone())
}

/// Returns the layout of a verified checkpoint with the given height.
Expand All @@ -757,7 +751,7 @@ impl StateLayout {
&self,
height: Height,
) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
let cp = self.checkpoint(height)?;
let cp = self.checkpoint::<ReadOnly>(height)?;
if !cp.is_checkpoint_verified() {
return Err(LayoutError::CheckpointUnverified(height));
};
Expand All @@ -773,7 +767,7 @@ impl StateLayout {
&self,
height: Height,
) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
self.checkpoint(height)
self.checkpoint::<ReadOnly>(height)
}

/// Returns if a checkpoint with the given height is verified or not.
Expand Down Expand Up @@ -1567,6 +1561,22 @@ impl<Permissions: AccessPolicy> CheckpointLayout<Permissions> {
pub fn is_checkpoint_verified(&self) -> bool {
!self.unverified_checkpoint_marker().exists()
}

pub fn mark_files_readonly_and_sync(
&self,
thread_pool: Option<&mut scoped_threadpool::Pool>,
) -> Result<(), LayoutError> {
mark_files_readonly_and_sync(self.raw_path(), thread_pool).map_err(|err| {
LayoutError::IoError {
path: self.raw_path().to_path_buf(),
message: format!(
"Could not mark files readonly and sync for checkpoint {}",
self.height()
),
io_err: err,
}
})
}
}

impl<P> CheckpointLayout<P>
Expand Down Expand Up @@ -1599,7 +1609,7 @@ impl CheckpointLayout<ReadOnly> {
/// A readonly checkpoint typically prevents modification to the files in the checkpoint.
/// However, the removal of the unverified checkpoint marker is allowed as
/// the marker is not part the checkpoint conceptually.
pub fn remove_unverified_checkpoint_marker(&self) -> Result<(), LayoutError> {
fn remove_unverified_checkpoint_marker(&self) -> Result<(), LayoutError> {
let marker = self.unverified_checkpoint_marker();
if !marker.exists() {
return Ok(());
Expand All @@ -1623,6 +1633,20 @@ impl CheckpointLayout<ReadOnly> {
io_err: err,
})
}

/// Finalizes the checkpoint by marking all files as read-only, ensuring
/// they are fully synchronized to disk, and then removing the unverified checkpoint marker.
///
/// This function is necessary due to the asynchronous checkpoint writing.
/// Marking the files as read-only and performing a sync operation should be the last step
/// before removing the unverified checkpoint marker to prevent data inconsistencies.
pub fn finalize_and_remove_unverified_marker(
&self,
thread_pool: Option<&mut scoped_threadpool::Pool>,
) -> Result<(), LayoutError> {
self.mark_files_readonly_and_sync(thread_pool)?;
self.remove_unverified_checkpoint_marker()
}
}

pub struct PageMapLayout<Permissions: AccessPolicy> {
Expand Down Expand Up @@ -2744,10 +2768,8 @@ fn dir_list_recursive(path: &Path) -> std::io::Result<Vec<PathBuf>> {

/// Recursively set permissions to readonly for all files under the given
/// `path`.
fn sync_and_mark_files_readonly(
#[allow(unused)] log: &ReplicaLogger,
fn mark_files_readonly_and_sync(
path: &Path,
#[allow(unused)] metrics: &StateLayoutMetrics,
mut thread_pool: Option<&mut scoped_threadpool::Pool>,
) -> std::io::Result<()> {
let paths = dir_list_recursive(path)?;
Expand All @@ -2769,11 +2791,6 @@ fn sync_and_mark_files_readonly(
return Err(std::io::Error::last_os_error());
}
}
let elapsed = start.elapsed();
metrics
.state_layout_syncfs_duration
.observe(elapsed.as_secs_f64());
info!(log, "syncfs took {:?}", elapsed);
}
Ok(())
}
Expand Down
14 changes: 6 additions & 8 deletions rs/state_layout/src/state_layout/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ fn test_removal_when_last_dropped() {
)
.unwrap(),
Height::new(1),
None,
)
.unwrap();
let cp2 = state_layout
Expand All @@ -297,7 +296,6 @@ fn test_removal_when_last_dropped() {
)
.unwrap(),
Height::new(2),
None,
)
.unwrap();
// Add one checkpoint so that we never remove the last one and crash
Expand All @@ -309,7 +307,6 @@ fn test_removal_when_last_dropped() {
)
.unwrap(),
Height::new(3),
None,
)
.unwrap();
assert_eq!(
Expand Down Expand Up @@ -351,7 +348,6 @@ fn test_last_removal_panics_in_debug() {
)
.unwrap(),
Height::new(1),
None,
)
.unwrap();
state_layout.remove_checkpoint_when_unused(Height::new(1));
Expand Down Expand Up @@ -391,19 +387,21 @@ fn test_can_remove_unverified_marker_file_twice() {
tip.create_unverified_checkpoint_marker().unwrap();

let checkpoint = state_layout
.scratchpad_to_checkpoint(scratchpad_layout, height, None)
.scratchpad_to_checkpoint(scratchpad_layout, height)
.unwrap();
checkpoint
.finalize_and_remove_unverified_marker(None)
.unwrap();
checkpoint.remove_unverified_checkpoint_marker().unwrap();

// The checkpoint already exists, therefore promoting the tip to checkpoint should fail.
// However, it can still access the checkpoint and try to remove the marker file again from its side.
let checkpoint_result = state_layout.scratchpad_to_checkpoint(tip, height, None);
let checkpoint_result = state_layout.scratchpad_to_checkpoint(tip, height);
assert!(checkpoint_result.is_err());

let res = state_layout
.checkpoint_in_verification(height)
.unwrap()
.remove_unverified_checkpoint_marker();
.finalize_and_remove_unverified_marker(None);
assert!(res.is_ok());
});
}
Expand Down
43 changes: 11 additions & 32 deletions rs/state_manager/src/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crossbeam_channel::{unbounded, Sender};
use ic_base_types::{subnet_id_try_from_protobuf, CanisterId, SnapshotId};
use ic_config::flag_status::FlagStatus;
use ic_logger::error;
use ic_logger::{error, fatal};
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::canister_snapshots::{
CanisterSnapshot, CanisterSnapshots, ExecutionStateSnapshot, PageMemory,
Expand All @@ -15,7 +15,7 @@ use ic_replicated_state::{
};
use ic_replicated_state::{CheckpointLoadingMetrics, Memory};
use ic_state_layout::{
CanisterLayout, CanisterSnapshotBits, CanisterStateBits, CheckpointLayout, ReadOnly,
CanisterLayout, CanisterSnapshotBits, CanisterStateBits, CheckpointLayout, ReadOnly, RwPolicy,
SnapshotLayout,
};
use ic_types::batch::RawQueryStats;
Expand Down Expand Up @@ -93,35 +93,13 @@ pub(crate) fn make_unvalidated_checkpoint(
})
.unwrap();
let (cp, has_downgrade) = recv.recv().unwrap()?;
// With lsmt storage, ResetTipAndMerge happens later (after manifest).
if lsmt_storage == FlagStatus::Disabled {
tip_channel
.send(TipRequest::ResetTipAndMerge {
checkpoint_layout: cp.clone(),
pagemaptypes: PageMapType::list_all_including_snapshots(state),
is_initializing_tip: false,
})
.unwrap();
}
(cp, has_downgrade)
};

if lsmt_storage == FlagStatus::Disabled {
// Wait for reset_tip_to so that we don't reflink in parallel with other operations.
let _timer = metrics
.make_checkpoint_step_duration
.with_label_values(&["wait_for_reflinking"])
.start_timer();
#[allow(clippy::disallowed_methods)]
let (send, recv) = unbounded();
tip_channel.send(TipRequest::Wait { sender: send }).unwrap();
recv.recv().unwrap();
}

Ok((cp, has_downgrade))
}

pub(crate) fn validate_checkpoint_and_remove_unverified_marker(
pub(crate) fn validate_and_finalize_checkpoint_and_remove_unverified_marker(
checkpoint_layout: &CheckpointLayout<ReadOnly>,
reference_state: Option<&ReplicatedState>,
own_subnet_type: SubnetType,
Expand All @@ -141,13 +119,13 @@ pub(crate) fn validate_checkpoint_and_remove_unverified_marker(
checkpoint_layout,
reference_state,
own_subnet_type,
thread_pool,
&mut thread_pool,
fd_factory,
metrics,
);
}
checkpoint_layout
.remove_unverified_checkpoint_marker()
.finalize_and_remove_unverified_marker(thread_pool)
.map_err(CheckpointError::from)?;
Ok(())
}
Expand All @@ -170,7 +148,8 @@ pub fn load_checkpoint_and_validate_parallel(
Some(&mut thread_pool),
Arc::clone(&fd_factory),
)?;
validate_checkpoint_and_remove_unverified_marker(

validate_and_finalize_checkpoint_and_remove_unverified_marker(
checkpoint_layout,
None,
own_subnet_type,
Expand Down Expand Up @@ -435,7 +414,7 @@ pub fn validate_eq_checkpoint(
checkpoint_layout: &CheckpointLayout<ReadOnly>,
reference_state: &ReplicatedState,
own_subnet_type: SubnetType,
thread_pool: Option<&mut scoped_threadpool::Pool>,
thread_pool: &mut Option<&mut scoped_threadpool::Pool>,
fd_factory: Arc<dyn PageAllocatorFileDescriptor>, //
metrics: &CheckpointMetrics, // Make optional in the loader & don't provide?
) {
Expand All @@ -462,7 +441,7 @@ fn validate_eq_checkpoint_internal(
checkpoint_layout: &CheckpointLayout<ReadOnly>,
reference_state: &ReplicatedState,
own_subnet_type: SubnetType,
mut thread_pool: Option<&mut scoped_threadpool::Pool>,
thread_pool: &mut Option<&mut scoped_threadpool::Pool>,
fd_factory: Arc<dyn PageAllocatorFileDescriptor>, //
metrics: &CheckpointMetrics, // Make optional in the loader & don't provide?
) -> Result<(), String> {
Expand All @@ -482,7 +461,7 @@ fn validate_eq_checkpoint_internal(
fd_factory,
};

checkpoint_loader.validate_eq_canister_states(&mut thread_pool, canister_states)?;
checkpoint_loader.validate_eq_canister_states(thread_pool, canister_states)?;
checkpoint_loader
.load_system_metadata()
.map_err(|err| format!("Failed to load system metadata: {}", err))?
Expand All @@ -497,7 +476,7 @@ fn validate_eq_checkpoint_internal(
if !consensus_queue.is_empty() {
return Err("consensus_queue is not empty".to_string());
}
checkpoint_loader.validate_eq_canister_snapshots(&mut thread_pool, canister_snapshots)
checkpoint_loader.validate_eq_canister_snapshots(thread_pool, canister_snapshots)
}

#[derive(Default)]
Expand Down
Loading
Loading