Skip to content

Commit

Permalink
async MarkCheckpointReadOnlyAndSync
Browse files Browse the repository at this point in the history
  • Loading branch information
ShuoWangNSL committed Jan 30, 2025
1 parent 35f3948 commit 756144d
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 34 deletions.
62 changes: 44 additions & 18 deletions rs/state_layout/src/state_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ struct CheckpointRefData {
/// 1. Create state files directly in
/// "<state_root>/fs_tmp/state_sync_scratchpad_<height>".
///
/// 2. When all the writes are complete, call sync_and_mark_files_readonly()
/// 2. When all the writes are complete, call mark_files_readonly_and_sync()
/// on "<state_root>/fs_tmp/state_sync_scratchpad_<height>". This function
/// syncs all the files and directories under the scratchpad directory,
/// including the scratchpad directory itself.
Expand Down Expand Up @@ -545,7 +545,7 @@ impl StateLayout {
) -> Result<(), LayoutError> {
for height in self.checkpoint_heights()? {
let path = self.checkpoint_verified(height)?.raw_path().to_path_buf();
sync_and_mark_files_readonly(&self.log, &path, &self.metrics, thread_pool.as_mut())
mark_files_readonly_and_sync(&self.log, &path, &self.metrics, thread_pool.as_mut())
.map_err(|err| LayoutError::IoError {
path,
message: format!("Could not sync and mark readonly checkpoint {}", height),
Expand Down Expand Up @@ -664,22 +664,12 @@ impl StateLayout {
&self,
layout: CheckpointLayout<RwPolicy<'_, T>>,
height: Height,
thread_pool: Option<&mut scoped_threadpool::Pool>,
) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
debug_assert_eq!(height, layout.height());
let scratchpad = layout.raw_path();
let checkpoints_path = self.checkpoints();
let cp_path = checkpoints_path.join(Self::checkpoint_name(height));
sync_and_mark_files_readonly(&self.log, scratchpad, &self.metrics, thread_pool).map_err(
|err| LayoutError::IoError {
path: scratchpad.to_path_buf(),
message: format!(
"Could not sync and mark readonly scratchpad for checkpoint {}",
height
),
io_err: err,
},
)?;

std::fs::rename(scratchpad, cp_path).map_err(|err| {
if is_already_exists_err(&err) {
LayoutError::AlreadyExists(height)
Expand Down Expand Up @@ -719,7 +709,10 @@ impl StateLayout {

/// Returns the layout of the checkpoint with the given height.
/// If the checkpoint is not found, an error is returned.
fn checkpoint(&self, height: Height) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
fn checkpoint<Permissions: AccessPolicy>(
&self,
height: Height,
) -> Result<CheckpointLayout<Permissions>, LayoutError> {
let cp_name = Self::checkpoint_name(height);
let path = self.checkpoints().join(cp_name);
if !path.exists() {
Expand Down Expand Up @@ -748,7 +741,7 @@ impl StateLayout {
}
}
}
CheckpointLayout::new(path, height, self.clone())
CheckpointLayout::<Permissions>::new(path, height, self.clone())
}

/// Returns the layout of a verified checkpoint with the given height.
Expand All @@ -757,7 +750,7 @@ impl StateLayout {
&self,
height: Height,
) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
let cp = self.checkpoint(height)?;
let cp = self.checkpoint::<ReadOnly>(height)?;
if !cp.is_checkpoint_verified() {
return Err(LayoutError::CheckpointUnverified(height));
};
Expand All @@ -773,7 +766,19 @@ impl StateLayout {
&self,
height: Height,
) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
self.checkpoint(height)
self.checkpoint::<ReadOnly>(height)
}

// Draft notes:
// The checkpoint is rw so that we can write proto files.
// The checkpoint is rw but still tracked, which is different from untracked rw tip. This is because we want to prevent it from being accidentally deleted.
// Although remove_states_below remove checkpoints based on states metadata, it is still good to not fully rely on that implementation.
// Ideally, the checkpoint should be limited to only writing the proto files, but we don't have such precise control yet. It is to be discussed whether to introduce a new policy for this.
pub fn checkpoint_in_async_writing(
&self,
height: Height,
) -> Result<CheckpointLayout<RwPolicy<()>>, LayoutError> {
self.checkpoint::<RwPolicy<()>>(height)
}

/// Returns if a checkpoint with the given height is verified or not.
Expand Down Expand Up @@ -1182,6 +1187,27 @@ impl StateLayout {
std::fs::create_dir_all(p)
}

// Draft notes:
// It seems that this function can be implemented for CheckpointLayout instead of StateLayout.
// However, there is no simple workaround for the needed StateLayoutMetrics.
pub fn mark_files_readonly_and_sync(
&self,
log: &ReplicaLogger,
path: &Path,
thread_pool: Option<&mut scoped_threadpool::Pool>,
) -> Result<(), LayoutError> {
mark_files_readonly_and_sync(log, path, &self.metrics, thread_pool).map_err(|err| {
LayoutError::IoError {
path: self.raw_path().to_path_buf(),
message: format!(
"Could not sync and mark readonly scratchpad for checkpoint {}",
self.height()
),
io_err: err,
}
})
}

/// Atomically copies a checkpoint with the specified name located at src
/// path into the specified dst path.
///
Expand Down Expand Up @@ -2744,7 +2770,7 @@ fn dir_list_recursive(path: &Path) -> std::io::Result<Vec<PathBuf>> {

/// Recursively set permissions to readonly for all files under the given
/// `path`.
fn sync_and_mark_files_readonly(
fn mark_files_readonly_and_sync(
#[allow(unused)] log: &ReplicaLogger,
path: &Path,
#[allow(unused)] metrics: &StateLayoutMetrics,
Expand Down
8 changes: 2 additions & 6 deletions rs/state_layout/src/state_layout/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ fn test_removal_when_last_dropped() {
)
.unwrap(),
Height::new(1),
None,
)
.unwrap();
let cp2 = state_layout
Expand All @@ -297,7 +296,6 @@ fn test_removal_when_last_dropped() {
)
.unwrap(),
Height::new(2),
None,
)
.unwrap();
// Add one checkpoint so that we never remove the last one and crash
Expand All @@ -309,7 +307,6 @@ fn test_removal_when_last_dropped() {
)
.unwrap(),
Height::new(3),
None,
)
.unwrap();
assert_eq!(
Expand Down Expand Up @@ -351,7 +348,6 @@ fn test_last_removal_panics_in_debug() {
)
.unwrap(),
Height::new(1),
None,
)
.unwrap();
state_layout.remove_checkpoint_when_unused(Height::new(1));
Expand Down Expand Up @@ -391,13 +387,13 @@ fn test_can_remove_unverified_marker_file_twice() {
tip.create_unverified_checkpoint_marker().unwrap();

let checkpoint = state_layout
.scratchpad_to_checkpoint(scratchpad_layout, height, None)
.scratchpad_to_checkpoint(scratchpad_layout, height)
.unwrap();
checkpoint.remove_unverified_checkpoint_marker().unwrap();

// The checkpoint already exists, therefore promoting the tip to checkpoint should fail.
// However, it can still access the checkpoint and try to remove the marker file again from its side.
let checkpoint_result = state_layout.scratchpad_to_checkpoint(tip, height, None);
let checkpoint_result = state_layout.scratchpad_to_checkpoint(tip, height);
assert!(checkpoint_result.is_err());

let res = state_layout
Expand Down
3 changes: 2 additions & 1 deletion rs/state_manager/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use ic_replicated_state::{
};
use ic_replicated_state::{CheckpointLoadingMetrics, Memory};
use ic_state_layout::{
CanisterLayout, CanisterSnapshotBits, CanisterStateBits, CheckpointLayout, ReadOnly,
CanisterLayout, CanisterSnapshotBits, CanisterStateBits, CheckpointLayout, ReadOnly, RwPolicy,
SnapshotLayout,
};
use ic_types::batch::RawQueryStats;
Expand Down Expand Up @@ -170,6 +170,7 @@ pub fn load_checkpoint_and_validate_parallel(
Some(&mut thread_pool),
Arc::clone(&fd_factory),
)?;

validate_checkpoint_and_remove_unverified_marker(
checkpoint_layout,
None,
Expand Down
20 changes: 20 additions & 0 deletions rs/state_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2573,6 +2573,7 @@ impl StateManagerImpl {
self.lsmt_status,
)
};
let _checkpoint_layout_for_async_writing = self.state_layout.checkpoint_in_async_writing(height);
let (cp_layout, has_downgrade) = match result {
Ok(response) => response,
Err(CheckpointError::AlreadyExists(_)) => {
Expand Down Expand Up @@ -2637,6 +2638,25 @@ impl StateManagerImpl {
},
);
}

//_checkpoint_layout_for_async_checkpointing could be used here for serializing protos to checkpoint here.
// self.tip_channel
// .send(TipRequest::SerializeProtosToCheckpoint {
// checkpoint_layout: _checkpoint_layout_for_async_checkpointing,
// replicated_state: Arc::clone(&state),
// })
// .unwrap();

// Draft notes:
// Before we come out with a new policy, ReadOnly checkpoint layout is still used below in MarkCheckpointReadOnlyAndSync and ValidateReplicatedState.
// The order of mark checkpoint readonly, sync, load, validate is still to be determined. They could be merged into one request.
// Removing the marker should be the last step, which is currently happening in ValidateReplicatedState.
self.tip_channel
.send(TipRequest::MarkCheckpointReadOnlyAndSync {
checkpoint_layout: cp_layout.clone(),
})
.expect("Failed to send Validate request");

let state = Arc::new(state);
self.tip_channel
.send(TipRequest::ValidateReplicatedState {
Expand Down
11 changes: 8 additions & 3 deletions rs/state_manager/src/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{

use ic_base_types::CanisterId;
use ic_config::state_manager::Config;
use ic_logger::ReplicaLogger;
use ic_logger::{fatal, ReplicaLogger};
use ic_metrics::MetricsRegistry;
use ic_registry_routing_table::{
difference, CanisterIdRange, CanisterIdRanges, RoutingTable, WellFormedError,
Expand Down Expand Up @@ -197,9 +197,9 @@ fn write_checkpoint(
)
.map_err(|e| e.to_string())?;
let (_tip_thread, tip_channel) = spawn_tip_thread(
log,
log.clone(),
tip_handler,
state_layout,
state_layout.clone(),
config.lsmt_config.clone(),
metrics.clone(),
MaliciousFlags::default(),
Expand All @@ -223,6 +223,11 @@ fn write_checkpoint(
config.lsmt_config.lsmt_status,
)
.map_err(|e| format!("Failed to write checkpoint: {}", e))?;

state_layout
.mark_files_readonly_and_sync(&log, cp_layout.raw_path(), Some(thread_pool))
.map_err(|e| format!("Failed to mark checkpoint readonly and sync: {}", e))?;

validate_checkpoint_and_remove_unverified_marker(
&cp_layout,
None,
Expand Down
19 changes: 18 additions & 1 deletion rs/state_manager/src/state_sync/chunkable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ impl IncompleteState {
)
});

match state_layout.scratchpad_to_checkpoint(scratchpad_layout, height, Some(thread_pool)) {
match state_layout.scratchpad_to_checkpoint(scratchpad_layout, height) {
Ok(_) => {
let elapsed = started_at.elapsed();
metrics
Expand Down Expand Up @@ -859,6 +859,23 @@ impl IncompleteState {
}
Err(err) => fatal!(log, "Unexpected layout error: {}", err),
}

// Draft notes:
// Functions are spread across make_checkpoint and deliver_state_sync.
// Since make_checkpoint does not return anything. Maybe it can be merged into deliver_state_sync.
// Then deliver_state_sync, we can all the steps one by one in a single place to avoid overlooking any step.
if let Err(err) = state_layout.mark_files_readonly_and_sync(
&log,
scratchpad_layout.raw_path(),
Some(thread_pool),
) {
fatal!(
&log,
"Failed to mark checkpoint {} readonly and sync: {:#}",
scratchpad_layout.raw_path().display(),
err
)
}
}

/// Preallocates the files listed in the manifest and copies the chunks
Expand Down
26 changes: 21 additions & 5 deletions rs/state_manager/src/tip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ pub(crate) enum TipRequest {
height: Height,
replicated_state: Box<ReplicatedState>,
},
MarkCheckpointReadOnlyAndSync {
checkpoint_layout: CheckpointLayout<ReadOnly>,
},
/// Compute manifest, store result into states and persist metadata as result.
/// State: *
ComputeManifest {
Expand Down Expand Up @@ -214,11 +217,8 @@ pub(crate) fn spawn_tip_thread(
continue;
}

let cp_or_err = state_layout.scratchpad_to_checkpoint(
tip,
height,
Some(&mut thread_pool),
);
let cp_or_err =
state_layout.scratchpad_to_checkpoint(tip, height);
match cp_or_err {
Err(err) => {
sender
Expand Down Expand Up @@ -454,6 +454,22 @@ pub(crate) fn spawn_tip_thread(
have_latest_manifest = true;
}

TipRequest::MarkCheckpointReadOnlyAndSync { checkpoint_layout } => {
let _timer = request_timer(&metrics, "mark_files_readonly_and_sync");
if let Err(err) = state_layout.mark_files_readonly_and_sync(
&log,
checkpoint_layout.raw_path(),
Some(&mut thread_pool),
) {
fatal!(
&log,
"Failed to mark checkpoint {} readonly and sync: {:#}",
checkpoint_layout.raw_path().display(),
err
)
}
}

TipRequest::ValidateReplicatedState {
checkpoint_layout,
reference_state,
Expand Down

0 comments on commit 756144d

Please sign in to comment.