From dc580383d4f0c44cdc23933409a0bdfbde1a2a46 Mon Sep 17 00:00:00 2001 From: Charles Li Date: Tue, 14 Jan 2025 16:48:21 +0000 Subject: [PATCH] fix(replay): use the first turbine slot to determine caught up --- src/app/fdctl/run/tiles/fd_replay.c | 12 +++++++++++- src/app/fdctl/run/tiles/fd_store_int.c | 15 +++++++++++++-- src/app/fdctl/run/topos/fd_firedancer.c | 8 ++++++++ 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/src/app/fdctl/run/tiles/fd_replay.c b/src/app/fdctl/run/tiles/fd_replay.c index 272e5593a8..0a1a980606 100644 --- a/src/app/fdctl/run/tiles/fd_replay.c +++ b/src/app/fdctl/run/tiles/fd_replay.c @@ -284,6 +284,7 @@ struct fd_replay_tile_ctx { through both of those slots. */ ulong * poh; /* proof-of-history slot */ + ulong * fts; /* first turbine slot */ uint poh_init_done; int snapshot_init_done; @@ -1763,7 +1764,8 @@ after_frag( fd_replay_tile_ctx_t * ctx, /* Consensus: send out a new vote by calling send_tower_sync */ /**********************************************************************/ - if( FD_UNLIKELY( ctx->vote && fd_fseq_query( ctx->poh ) == ULONG_MAX ) ) { + if( FD_UNLIKELY( ctx->vote && ctx->curr_slot >= fd_fseq_query( ctx->fts ) ) ) { + /* Only proceed with voting if we're caught up. */ FD_LOG_WARNING(( "still catching up. not voting." )); @@ -2481,6 +2483,14 @@ unprivileged_init( fd_topo_t * topo, FD_TEST( poh_slot_obj_id!=ULONG_MAX ); ctx->poh = fd_fseq_join( fd_topo_obj_laddr( topo, poh_slot_obj_id ) ); + /**********************************************************************/ + /* poh_slot fseq */ + /**********************************************************************/ + + ulong first_turbine_slot_id = fd_pod_query_ulong( topo->props, "first_turbine_slot", ULONG_MAX ); + FD_TEST( first_turbine_slot_id!=ULONG_MAX ); + ctx->fts = fd_fseq_join( fd_topo_obj_laddr( topo, first_turbine_slot_id ) ); + /**********************************************************************/ /* TOML paths */ /**********************************************************************/ diff --git a/src/app/fdctl/run/tiles/fd_store_int.c b/src/app/fdctl/run/tiles/fd_store_int.c index 0c6b103849..788ce160d0 100644 --- a/src/app/fdctl/run/tiles/fd_store_int.c +++ b/src/app/fdctl/run/tiles/fd_store_int.c @@ -131,6 +131,7 @@ struct fd_store_tile_ctx { fd_stake_ci_t * stake_ci; ulong * root_slot_fseq; + ulong * fts; /* first turbine slot */ int sim; @@ -308,7 +309,11 @@ after_frag( fd_store_tile_ctx_t * ctx, } } - fd_store_shred_update_with_shred_from_turbine( ctx->store, &ctx->s34_buffer->pkts[i].shred ); + fd_shred_t * pkt_shred = &ctx->s34_buffer->pkts[i].shred; + fd_store_shred_update_with_shred_from_turbine( ctx->store, pkt_shred ); + if( FD_UNLIKELY( fd_fseq_query( ctx->fts ) == ULONG_MAX ) ) { + fd_fseq_update( ctx->fts, pkt_shred->slot ); + } } } @@ -608,9 +613,15 @@ unprivileged_init( fd_topo_t * topo, ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" ); FD_TEST( root_slot_obj_id!=ULONG_MAX ); ctx->root_slot_fseq = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) ); - if( FD_UNLIKELY( !ctx->root_slot_fseq ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" )); + if( FD_UNLIKELY( !ctx->root_slot_fseq ) ) FD_LOG_ERR(( "topo has no root_slot fseq" )); FD_TEST( ULONG_MAX==fd_fseq_query( ctx->root_slot_fseq ) ); + ulong fts_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "fts" ); + FD_TEST( fts_obj_id!=ULONG_MAX ); + ctx->fts = fd_fseq_join( fd_topo_obj_laddr( topo, fts_obj_id ) ); + if( FD_UNLIKELY( !ctx->fts ) ) FD_LOG_ERR(( "topo has no fts fseq" )); + FD_TEST( ULONG_MAX==fd_fseq_query( ctx->fts ) ); + /* Prevent blockstore from being created until we know the shred version */ ulong expected_shred_version = tile->store_int.expected_shred_version; if( FD_LIKELY( !expected_shred_version ) ) { diff --git a/src/app/fdctl/run/topos/fd_firedancer.c b/src/app/fdctl/run/topos/fd_firedancer.c index 6fcc44091c..d97e6f36b8 100644 --- a/src/app/fdctl/run/topos/fd_firedancer.c +++ b/src/app/fdctl/run/topos/fd_firedancer.c @@ -123,6 +123,7 @@ fd_topo_initialize( config_t * config ) { fd_topob_wksp( topo, "replay_notif" ); fd_topob_wksp( topo, "bank_busy" ); fd_topob_wksp( topo, "root_slot" ); + fd_topob_wksp( topo, "fts" ); fd_topob_wksp( topo, "pack_replay" ); fd_topob_wksp( topo, "replay_voter" ); fd_topob_wksp( topo, "gossip_voter" ); @@ -315,6 +316,13 @@ fd_topo_initialize( config_t * config ) { fd_topob_tile_uses( topo, store_tile, root_slot_obj, FD_SHMEM_JOIN_MODE_READ_ONLY ); FD_TEST( fd_pod_insertf_ulong( topo->props, root_slot_obj->id, "root_slot" ) ); + /* This fseq maintains the node's first turbine slot to determine + whether it has caught up. */ + fd_topo_obj_t * fts_obj = fd_topob_obj( topo, "fseq", "fts" ); + fd_topob_tile_uses( topo, replay_tile, fts_obj, FD_SHMEM_JOIN_MODE_READ_ONLY ); + fd_topob_tile_uses( topo, store_tile, fts_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); + FD_TEST( fd_pod_insertf_ulong( topo->props, fts_obj->id, "fts" ) ); + for( ulong i=0UL; itiles[ fd_topo_find_tile( topo, "shred", i ) ]; fd_topob_tile_uses( topo, shred_tile, poh_shred_obj, FD_SHMEM_JOIN_MODE_READ_ONLY );