Skip to content

Commit

Permalink
feat: implement end
Browse files Browse the repository at this point in the history
  • Loading branch information
karlem committed Feb 26, 2025
1 parent 89c65b1 commit e46c040
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 64 deletions.
175 changes: 164 additions & 11 deletions fendermint/vm/interpreter/src/bottomup.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,34 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::fvm::state::ipc::GatewayCaller;
use crate::fvm::PowerUpdates;
use async_stm::atomically;
use fendermint_vm_message::chain::ChainMessage;
use fendermint_vm_message::ipc::{BottomUpCheckpoint, CertifiedMessage, IpcMessage};
use fendermint_vm_resolver::pool::{ResolveKey, ResolvePool};
use tendermint_rpc::Client;

use fendermint_crypto::{PublicKey, SecretKey};
use fvm_ipld_blockstore::Blockstore;
use ipc_observability::{emit, observe::TracingError, Traceable};

use crate::fvm::observe::CheckpointFinalized;
// TODO Karel - this should be moved here.
use crate::fvm::ValidatorContext;

use anyhow::Context;

use crate::fvm::broadcast::Broadcaster;
use crate::fvm::checkpoint::{
broadcast_incomplete_signatures, emit_trace_if_check_checkpoint_finalized,
maybe_create_checkpoint, unsigned_checkpoints,
};
use crate::fvm::state::FvmExecState;

use crate::types::BlockEndEvents;

use fvm_shared::address::Address;

#[derive(Clone, Hash, PartialEq, Eq)]
pub enum CheckpointPoolItem {
Expand All @@ -28,21 +55,48 @@ impl From<&CheckpointPoolItem> for ResolveKey {
}
}

pub struct BottomUpCheckpointResolver {
// TODO Karel - clean this up. This should probably not be here and also the bottom up check ABI should not leak here.
pub struct CheckpointOutcome {
pub checkpoint: ipc_actors_abis::checkpointing_facet::BottomUpCheckpoint,
pub power_updates: PowerUpdates,
pub block_end_events: BlockEndEvents,
}

pub struct BottomUpManager<DB, C>
where
DB: Blockstore + Clone + 'static + Send + Sync,
C: Client + Clone + Send + Sync + 'static,
{
pool: ResolvePool<CheckpointPoolItem>,

/// Tendermint client for querying the RPC.
tendermint_client: C,
/// If this is a validator node, this should be the key we can use to sign transactions.
validator_ctx: Option<ValidatorContext<C>>,

gateway_caller: GatewayCaller<DB>,
}

impl BottomUpCheckpointResolver {
pub fn new(resolve_pool: ResolvePool<CheckpointPoolItem>) -> Self {
Self { pool: resolve_pool }
impl<DB, C> BottomUpManager<DB, C>
where
DB: Blockstore + Clone + 'static + Send + Sync,
C: Client + Clone + Send + Sync + 'static,
{
pub fn new(
resolve_pool: ResolvePool<CheckpointPoolItem>,
tendermint_client: C,
validator_ctx: Option<ValidatorContext<C>>,
) -> Self {
Self {
tendermint_client,
pool: resolve_pool,
validator_ctx,
gateway_caller: GatewayCaller::default(),
}
}

pub async fn check_checkpoint_resolved(
&self,
msg: CertifiedMessage<BottomUpCheckpoint>,
) -> bool {
let item = CheckpointPoolItem::BottomUp(msg);

// Checks if the bottom up checkpoint is already resolved
pub async fn is_checkpoint_resolved(&self, item: CheckpointPoolItem) -> bool {
// We can just look in memory because when we start the application, we should retrieve any
// pending checkpoints (relayed but not executed) from the ledger, so they should be there.
// We don't have to validate the checkpoint here, because
Expand All @@ -56,7 +110,7 @@ impl BottomUpCheckpointResolver {
is_resolved
}

// Checks the bottom up checkpoint pool and returns the messages that are ready for execution
// Checks the bottom up checkpoint pool and returns the messages that represent the checkpoints for execution
pub async fn messages_from_resolved_checkpoints(&self) -> Vec<ChainMessage> {
let resolved = atomically(|| self.pool.collect_resolved()).await;
resolved
Expand All @@ -68,4 +122,103 @@ impl BottomUpCheckpointResolver {
})
.collect()
}

pub fn create_checkpoint_if_needed(
&self,
state: &mut FvmExecState<DB>,
) -> anyhow::Result<Option<CheckpointOutcome>> {
let mut block_end_events = BlockEndEvents::default();

// Emit trace; errors here are logged but not fatal.
let _ = emit_trace_if_check_checkpoint_finalized(&self.gateway_caller, state).inspect_err(
|e| {
emit(TracingError {
affected_event: CheckpointFinalized::name(),
reason: e.to_string(),
});
},
);

let maybe_result =
maybe_create_checkpoint(&self.gateway_caller, state, &mut block_end_events)
.context("failed to create checkpoint")?;

if let Some((checkpoint, power_updates)) = maybe_result {
Ok(Some(CheckpointOutcome {
checkpoint,
power_updates,
block_end_events,
}))
} else {
Ok(None)
}
}

pub async fn cast_validator_signatures_for_incomplete_checkpoints(
&self,
current_checkpoint: ipc_actors_abis::checkpointing_facet::BottomUpCheckpoint,
state: &mut FvmExecState<DB>,
) -> anyhow::Result<()> {
// Exit early if there's no validator context.
let validator_ctx = match self.validator_ctx.as_ref() {
Some(ctx) => ctx,
None => return Ok(()),
};

// If we're currently syncing, do not resend past signatures.
if self.syncing().await {
return Ok(());
}

// Retrieve incomplete checkpoints synchronously (state cannot be shared across threads).
let incomplete_checkpoints =
unsigned_checkpoints(&self.gateway_caller, state, validator_ctx.public_key)
.context("failed to fetch incomplete checkpoints")?;

// Ensure that the current checkpoint exists among the incomplete ones.
debug_assert!(
incomplete_checkpoints.iter().any(|checkpoint| {
checkpoint.block_height == current_checkpoint.block_height
&& checkpoint.block_hash == current_checkpoint.block_hash
}),
"the current checkpoint is incomplete"
);

// Clone the necessary values to move into the asynchronous task.
let client = self.tendermint_client.clone();
let gateway = self.gateway_caller.clone();
let chain_id = state.chain_id();
let height = current_checkpoint.block_height;
let validator_ctx = validator_ctx.clone();

// Spawn an asynchronous task to broadcast incomplete checkpoint signatures.
tokio::spawn(async move {
if let Err(e) = broadcast_incomplete_signatures(
&client,
&validator_ctx,
&gateway,
chain_id,
incomplete_checkpoints,
)
.await
{
tracing::error!(error = ?e, height = height.as_u64(), "error broadcasting checkpoint signature");
}
});

Ok(())
}

/// Indicate that the node is syncing with the rest of the network and hasn't caught up with the tip yet.
async fn syncing(&self) -> bool {
match self.tendermint_client.status().await {
Ok(status) => status.sync_info.catching_up,
Err(e) => {
// CometBFT often takes a long time to boot, e.g. while it's replaying blocks it won't
// respond to JSON-RPC calls. Let's treat this as an indication that we are syncing.
tracing::warn!(error =? e, "failed to get CometBFT sync status");
true
}
}
}
}
10 changes: 5 additions & 5 deletions fendermint/vm/interpreter/src/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use crate::types::*;
pub fn check_nonce_and_sufficient_balance(
state: &FvmExecState<ReadOnlyBlockstore<Arc<impl Blockstore + Clone + 'static>>>,
msg: &FvmMessage,
) -> anyhow::Result<FvmCheckRet> {
) -> anyhow::Result<CheckResponse> {
// Look up the actor associated with the sender's address.
let actor = match lookup_actor(&state, &msg.from)? {
Some(actor) => actor,
None => {
return Ok(FvmCheckRet::new(
return Ok(CheckResponse::new(
msg,
ExitCode::SYS_SENDER_STATE_INVALID,
None,
Expand All @@ -31,7 +31,7 @@ pub fn check_nonce_and_sufficient_balance(

// Check for sufficient balance.
if actor.balance < balance_needed {
return Ok(FvmCheckRet::new(
return Ok(CheckResponse::new(
msg,
ExitCode::SYS_SENDER_STATE_INVALID,
Some(format!(
Expand All @@ -43,7 +43,7 @@ pub fn check_nonce_and_sufficient_balance(

// Check for a nonce match.
if actor.sequence != msg.sequence {
return Ok(FvmCheckRet::new(
return Ok(CheckResponse::new(
msg,
ExitCode::SYS_SENDER_STATE_INVALID,
Some(format!(
Expand All @@ -53,7 +53,7 @@ pub fn check_nonce_and_sufficient_balance(
));
}

Ok(FvmCheckRet::new(msg, ExitCode::OK, None))
Ok(CheckResponse::new(msg, ExitCode::OK, None))
}

/// Looks up an actor by address in the state tree.
Expand Down
10 changes: 5 additions & 5 deletions fendermint/vm/interpreter/src/fvm/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

mod broadcast;
pub mod broadcast;
mod check;
pub mod checkpoint;
mod exec;
Expand Down Expand Up @@ -39,14 +39,14 @@ pub type BlockGasLimit = u64;
#[derive(Clone)]
pub struct ValidatorContext<C> {
/// The secret key the validator uses to produce blocks.
secret_key: SecretKey,
pub secret_key: SecretKey,
/// The public key identifying the validator (corresponds to the secret key.)
public_key: PublicKey,
pub public_key: PublicKey,
/// The address associated with the public key.
addr: Address,
pub addr: Address,
/// Used to broadcast transactions. It might use a different secret key for
/// signing transactions than the validator's block producing key.
broadcaster: Broadcaster<C>,
pub broadcaster: Broadcaster<C>,
}

impl<C> ValidatorContext<C> {
Expand Down
10 changes: 5 additions & 5 deletions fendermint/vm/interpreter/src/implicit_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn execute_implicit_message<DB: Blockstore + Clone + 'static + Send + Sync>(
gas_limit: u64,
method_num: u64,
params: RawBytes,
) -> anyhow::Result<FvmApplyRet> {
) -> anyhow::Result<ApplyResponse> {
let msg = FvmMessage {
from,
to,
Expand All @@ -42,7 +42,7 @@ fn execute_implicit_message<DB: Blockstore + Clone + 'static + Send + Sync>(
if let Some(err) = apply_ret.failure_info {
anyhow::bail!("failed to apply system message: {}", err);
}
Ok(FvmApplyRet {
Ok(ApplyResponse {
apply_ret,
emitters,
from,
Expand All @@ -56,7 +56,7 @@ fn execute_implicit_message<DB: Blockstore + Clone + 'static + Send + Sync>(
pub fn execute_cron_message<DB: Blockstore + Clone + 'static + Send + Sync>(
state: &mut FvmExecState<DB>,
height: u64,
) -> anyhow::Result<FvmApplyRet> {
) -> anyhow::Result<ApplyResponse> {
let from = system::SYSTEM_ACTOR_ADDR;
let to = cron::CRON_ACTOR_ADDR;
let method_num = cron::Method::EpochTick as u64;
Expand All @@ -67,10 +67,10 @@ pub fn execute_cron_message<DB: Blockstore + Clone + 'static + Send + Sync>(
}

/// Attempts to push chain metadata if a block hash is available.
pub fn maybe_push_chain_metadata<DB: Blockstore + Clone + 'static + Send + Sync>(
pub fn push_chain_metadata_if_possible<DB: Blockstore + Clone + 'static + Send + Sync>(
state: &mut FvmExecState<DB>,
height: u64,
) -> anyhow::Result<Option<FvmApplyRet>> {
) -> anyhow::Result<Option<ApplyResponse>> {
let from = system::SYSTEM_ACTOR_ADDR;
let to = chainmetadata::CHAINMETADATA_ACTOR_ADDR;
let method_num = fendermint_actor_chainmetadata::Method::PushBlockHash as u64;
Expand Down
Loading

0 comments on commit e46c040

Please sign in to comment.