Skip to content

Commit

Permalink
Support selective fdctl topo
Browse files Browse the repository at this point in the history
  • Loading branch information
riptl committed Feb 13, 2025
1 parent 4d173a0 commit 3186b0d
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 238 deletions.
1 change: 1 addition & 0 deletions src/app/fdctl/Local.mk
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ $(call add-objs,run/tiles/fd_exec,fd_fdctl)
endif

# fdctl topologies
$(call add-objs,run/topos/fd_topos_common,fd_fdctl)
ifdef FD_HAS_NO_AGAVE
$(call add-objs,run/topos/fd_firedancer,fd_fdctl)
else
Expand Down
1 change: 0 additions & 1 deletion src/app/fdctl/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,6 @@ fdctl_cfg_from_env( int * pargc,

fdctl_cfg_validate( config );
validate_ports( config );
fd_topo_initialize( config );
}

int
Expand Down
123 changes: 22 additions & 101 deletions src/app/fdctl/run/topos/fd_firedancer.c
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
/* Firedancer topology used for testing the full validator.
Associated test script: test_firedancer.sh */
#include "../../fdctl.h"
#include "topos.h"

#include "../tiles/fd_replay_notif.h"
#include "../../../../choreo/fd_choreo_base.h"
#include "../../../../disco/quic/fd_tpu.h"
#include "../../../../disco/tiles.h"
#include "../../../../disco/topo/fd_topob.h"
#include "../../../../disco/topo/fd_pod_format.h"
#include "../../../../disco/netlink/fd_netlink_tile.h" /* fd_netlink_topo_create */
#include "../../../../flamenco/runtime/fd_blockstore.h"
#include "../../../../flamenco/runtime/fd_runtime.h"
#include "../../../../flamenco/runtime/fd_txncache.h"
#include "../../../../util/tile/fd_tile_private.h"
#include "../../../../util/shmem/fd_shmem_private.h"
#include "../../../../util/net/fd_net_headers.h"

#include <sys/sysinfo.h>
#include <sys/random.h>
Expand Down Expand Up @@ -64,7 +59,19 @@ setup_topo_txncache( fd_topo_t * topo,
}

void
fd_topo_initialize( config_t * config ) {
fd_topos_create_validator( fd_topo_t * topo,
config_t * config,
fd_topos_affinity_t const * affinity ) {

ulong const * tile_to_cpu = affinity->tile_to_cpu;

int is_agave_auto_affinity = !strcmp( config->layout.agave_affinity, "auto" );
if( FD_UNLIKELY( affinity->is_auto != is_agave_auto_affinity ) ) {
FD_LOG_ERR(( "The CPU affinity string in the configuration file under [layout.affinity] and [layout.agave_affinity] must both be set to 'auto' or both be set to a specific CPU affinity string." ));
}

/* Generate topology */

ulong net_tile_cnt = config->layout.net_tile_count;
ulong shred_tile_cnt = config->layout.shred_tile_count;
ulong quic_tile_cnt = config->layout.quic_tile_count;
Expand All @@ -77,17 +84,15 @@ fd_topo_initialize( config_t * config ) {

int enable_rpc = ( config->rpc.port != 0 );

fd_topo_t * topo = { fd_topob_new( &config->topo, config->name ) };
topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size );
fd_topob_wksp( topo, "metric_in" );
fd_topos_add_net_tile( topo, config, affinity->tile_to_cpu );

/* topo, name */
fd_topob_wksp( topo, "netbase" );
fd_topob_wksp( topo, "net_netlink" );
fd_topob_wksp( topo, "net_shred" );
fd_topob_wksp( topo, "net_gossip" );
fd_topob_wksp( topo, "net_repair" );
fd_topob_wksp( topo, "net_quic" );
fd_topob_wksp( topo, "net_voter" );
fd_topob_wksp( topo, "net_shred" );
fd_topob_wksp( topo, "net_gossip" );
fd_topob_wksp( topo, "net_repair" );
fd_topob_wksp( topo, "net_quic" );
fd_topob_wksp( topo, "net_voter" );

fd_topob_wksp( topo, "quic_verify" );
fd_topob_wksp( topo, "verify_dedup" );
Expand Down Expand Up @@ -136,8 +141,6 @@ fd_topo_initialize( config_t * config ) {
fd_topob_wksp( topo, "voter_dedup" );
fd_topob_wksp( topo, "batch_replay" );

fd_topob_wksp( topo, "net" );
fd_topob_wksp( topo, "netlnk" );
fd_topob_wksp( topo, "quic" );
fd_topob_wksp( topo, "verify" );
fd_topob_wksp( topo, "dedup" );
Expand Down Expand Up @@ -168,7 +171,6 @@ fd_topo_initialize( config_t * config ) {
#define FOR(cnt) for( ulong i=0UL; i<cnt; i++ )

/* topo, link_name, wksp_name, depth, mtu, burst */
FOR(net_tile_cnt) fd_topob_link( topo, "net_netlink", "net_netlink", 128UL, 0UL, 0UL );
FOR(net_tile_cnt) fd_topob_link( topo, "net_gossip", "net_gossip", config->tiles.net.send_buffer_size, FD_NET_MTU, 1UL );
FOR(net_tile_cnt) fd_topob_link( topo, "net_repair", "net_repair", config->tiles.net.send_buffer_size, FD_NET_MTU, 1UL );
FOR(net_tile_cnt) fd_topob_link( topo, "net_quic", "net_quic", config->tiles.net.send_buffer_size, FD_NET_MTU, 1UL );
Expand Down Expand Up @@ -223,33 +225,7 @@ fd_topo_initialize( config_t * config ) {
/**/ fd_topob_link( topo, "batch_replay", "batch_replay", 128UL, 32UL, 1UL );
ushort parsed_tile_to_cpu[ FD_TILE_MAX ];
/* Unassigned tiles will be floating, unless auto topology is enabled. */
for( ulong i=0UL; i<FD_TILE_MAX; i++ ) parsed_tile_to_cpu[ i ] = USHORT_MAX;

int is_auto_affinity = !strcmp( config->layout.affinity, "auto" );
int is_bench_auto_affinity = !strcmp( config->development.bench.affinity, "auto" );

if( FD_UNLIKELY( is_auto_affinity != is_bench_auto_affinity ) ) {
FD_LOG_ERR(( "The CPU affinity string in the configuration file under [layout.affinity] and [development.bench.affinity] must all be set to 'auto' or all be set to a specific CPU affinity string." ));
}

ulong affinity_tile_cnt = 0UL;
if( FD_LIKELY( !is_auto_affinity ) ) affinity_tile_cnt = fd_tile_private_cpus_parse( config->layout.affinity, parsed_tile_to_cpu );

ulong tile_to_cpu[ FD_TILE_MAX ];
for( ulong i=0UL; i<affinity_tile_cnt; i++ ) {
if( FD_UNLIKELY( parsed_tile_to_cpu[ i ]!=USHORT_MAX && parsed_tile_to_cpu[ i ]>=fd_numa_cpu_cnt() ) )
FD_LOG_ERR(( "The CPU affinity string in the configuration file under [layout.affinity] specifies a CPU index of %hu, but the system "
"only has %lu CPUs. You should either change the CPU allocations in the affinity string, or increase the number of CPUs "
"in the system.",
parsed_tile_to_cpu[ i ], fd_numa_cpu_cnt() ));
tile_to_cpu[ i ] = fd_ulong_if( parsed_tile_to_cpu[ i ]==USHORT_MAX, ULONG_MAX, (ulong)parsed_tile_to_cpu[ i ] );
}

/* topo, tile_name, tile_wksp, metrics_wksp, cpu_idx, is_agave */
FOR(net_tile_cnt) fd_topob_tile( topo, "net", "net", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0 );
fd_topo_tile_t * netlink_tile = fd_topob_tile( topo, "netlnk" , "netlnk", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0 );
FOR(quic_tile_cnt) fd_topob_tile( topo, "quic", "quic", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0 );
FOR(verify_tile_cnt) fd_topob_tile( topo, "verify", "verify", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0 );
/**/ fd_topob_tile( topo, "dedup", "dedup", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0 );
Expand Down Expand Up @@ -345,29 +321,9 @@ fd_topo_initialize( config_t * config ) {
fd_topob_tile_uses( topo, snaps_tile, constipated_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
FD_TEST( fd_pod_insertf_ulong( topo->props, constipated_obj->id, "constipate" ) );

if( FD_LIKELY( !is_auto_affinity ) ) {
if( FD_UNLIKELY( affinity_tile_cnt<topo->tile_cnt ) )
FD_LOG_ERR(( "The topology you are using has %lu tiles, but the CPU affinity specified in the config tile as [layout.affinity] only provides for %lu cores. "
"You should either increase the number of cores dedicated to Firedancer in the affinity string, or decrease the number of cores needed by reducing "
"the total tile count. You can reduce the tile count by decreasing individual tile counts in the [layout] section of the configuration file.",
topo->tile_cnt, affinity_tile_cnt ));
if( FD_UNLIKELY( affinity_tile_cnt>topo->tile_cnt ) )
FD_LOG_WARNING(( "The topology you are using has %lu tiles, but the CPU affinity specified in the config tile as [layout.affinity] provides for %lu cores. "
"Not all cores in the affinity will be used by Firedancer. You may wish to increase the number of tiles in the system by increasing "
"individual tile counts in the [layout] section of the configuration file.",
topo->tile_cnt, affinity_tile_cnt ));
}

/* The netlink tile shares various objects to net tiles */
fd_netlink_topo_create( netlink_tile, topo, config );
for( ulong i=0UL; i<net_tile_cnt; i++ ) {
ulong net_tile_id = fd_topo_find_tile( topo, "net", i ); FD_TEST( net_tile_id!=ULONG_MAX );
fd_netlink_topo_join( topo, netlink_tile, &topo->tiles[ net_tile_id ] );
}
fd_topob_tile_in( topo, "netlnk", 0UL, "metric_in", "net_netlink", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED );
fd_topos_detect_affinity_mismatch( topo, affinity );

/* topo, tile_name, tile_kind_id, fseq_wksp, link_name, link_kind_id, reliable, polled */
FOR(net_tile_cnt) fd_topob_tile_out( topo, "net", i, "net_netlink", i );
FOR(net_tile_cnt) for( ulong j=0UL; j<shred_tile_cnt; j++ )
fd_topob_tile_in( topo, "net", i, "metric_in", "shred_net", j, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
FOR(net_tile_cnt) fd_topob_tile_out( topo, "net", i, "net_shred", i );
Expand Down Expand Up @@ -528,15 +484,6 @@ fd_topo_initialize( config_t * config ) {
fd_topo_tile_t * tile = &topo->tiles[ i ];

if( FD_UNLIKELY( !strcmp( tile->name, "net" ) ) ) {
strncpy( tile->net.interface, config->tiles.net.interface, sizeof(tile->net.interface) );
memcpy( tile->net.src_mac_addr, config->tiles.net.mac_addr, 6UL );

tile->net.tx_flush_timeout_ns = (long)config->tiles.net.flush_timeout_micros * 1000L;
tile->net.xdp_rx_queue_size = config->tiles.net.xdp_rx_queue_size;
tile->net.xdp_tx_queue_size = config->tiles.net.xdp_tx_queue_size;
tile->net.src_ip_addr = config->tiles.net.ip_addr;
tile->net.zero_copy = 0==strcmp( config->tiles.net.xdp_mode, "drv" ); /* only enable for drv */
fd_memcpy( tile->net.xdp_mode, config->tiles.net.xdp_mode, 8 );

tile->net.shred_listen_port = config->tiles.shred.shred_listen_port;
tile->net.quic_transaction_listen_port = config->tiles.quic.quic_transaction_listen_port;
Expand All @@ -545,16 +492,6 @@ fd_topo_initialize( config_t * config ) {
tile->net.repair_intake_listen_port = config->tiles.repair.repair_intake_listen_port;
tile->net.repair_serve_listen_port = config->tiles.repair.repair_serve_listen_port;

tile->net.netdev_dbl_buf_obj_id = netlink_tile->netlink.netdev_dbl_buf_obj_id;
tile->net.fib4_main_obj_id = netlink_tile->netlink.fib4_main_obj_id;
tile->net.fib4_local_obj_id = netlink_tile->netlink.fib4_local_obj_id;
tile->net.neigh4_obj_id = netlink_tile->netlink.neigh4_obj_id;
tile->net.neigh4_ele_obj_id = netlink_tile->netlink.neigh4_ele_obj_id;

} else if( FD_UNLIKELY( !strcmp( tile->name, "netlnk" ) ) ) {

/* already configured */

} else if( FD_UNLIKELY( !strcmp( tile->name, "quic" ) ) ) {
fd_memcpy( tile->quic.src_mac_addr, config->tiles.net.mac_addr, 6 );

Expand Down Expand Up @@ -679,8 +616,6 @@ fd_topo_initialize( config_t * config ) {
FD_LOG_NOTICE(("config->consensus.identity_path: %s", config->consensus.identity_path));
FD_LOG_NOTICE(("config->consensus.vote_account_path: %s", config->consensus.vote_account_path));

} else if( FD_UNLIKELY( !strcmp( tile->name, "bhole" ) ) ) {

} else if( FD_UNLIKELY( !strcmp( tile->name, "sign" ) ) ) {
strncpy( tile->sign.identity_key_path, config->consensus.identity_path, sizeof(tile->sign.identity_key_path) );

Expand All @@ -689,8 +624,6 @@ fd_topo_initialize( config_t * config ) {
FD_LOG_ERR(( "failed to parse prometheus listen address `%s`", config->tiles.metric.prometheus_listen_address ));
tile->metric.prometheus_listen_port = config->tiles.metric.prometheus_listen_port;

} else if( FD_UNLIKELY( !strcmp( tile->name, "rtpool" ) ) ) {
/* Nothing for now */
} else if( FD_UNLIKELY( !strcmp( tile->name, "pack" ) ) ) {
strncpy( tile->pack.identity_key_path, config->consensus.identity_path, sizeof(tile->pack.identity_key_path) );

Expand Down Expand Up @@ -728,28 +661,16 @@ fd_topo_initialize( config_t * config ) {
strncpy( tile->batch.out_dir, config->tiles.batch.out_dir, sizeof(tile->batch.out_dir) );
tile->batch.hash_tpool_thread_count = config->tiles.batch.hash_tpool_thread_count;
strncpy( tile->replay.funk_file, config->tiles.replay.funk_file, sizeof(tile->replay.funk_file) );
} else if( FD_UNLIKELY( !strcmp( tile->name, "btpool" ) ) ) {
/* Nothing for now */
} else if( FD_UNLIKELY( !strcmp( tile->name, "gui" ) ) ) {
if( FD_UNLIKELY( !fd_cstr_to_ip4_addr( config->tiles.gui.gui_listen_address, &tile->gui.listen_addr ) ) )
FD_LOG_ERR(( "failed to parse gui listen address `%s`", config->tiles.gui.gui_listen_address ));
tile->gui.listen_port = config->tiles.gui.gui_listen_port;
tile->gui.is_voting = strcmp( config->consensus.vote_account_path, "" );
strncpy( tile->gui.cluster, config->cluster, sizeof(tile->gui.cluster) );
strncpy( tile->gui.identity_key_path, config->consensus.identity_path, sizeof(tile->gui.identity_key_path) );
} else if( FD_UNLIKELY( !strcmp( tile->name, "plugin" ) ) ) {

} else if( FD_UNLIKELY( !strcmp( tile->name, "exec" ) ) ) {

} else {
FD_LOG_ERR(( "unknown tile name %lu `%s`", i, tile->name ));
}
}

if( FD_UNLIKELY( is_auto_affinity ) ) fd_topob_auto_layout( topo );

fd_topob_finish( topo, fdctl_obj_align, fdctl_obj_footprint, fdctl_obj_loose );

const char * status_cache = config->tiles.replay.status_cache;
if ( strlen( status_cache ) > 0 ) {
/* Make the status cache workspace match the parameters used to create the
Expand Down
Loading

0 comments on commit 3186b0d

Please sign in to comment.