Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize removing clients over validators in the heartbeat logic #3504

Open
wants to merge 13 commits into
base: staging
Choose a base branch
from
106 changes: 56 additions & 50 deletions node/router/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

use crate::{
Outbound,
Peer,
Router,
messages::{DisconnectReason, Message, PeerRequest},
};
Expand Down Expand Up @@ -104,8 +105,44 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
}
}

/// This function removes the oldest connected peer, to keep the connections fresh.
/// This function only triggers if the router is above the minimum number of connected peers.
/// Returns a sorted vector of network addresess of all removable connected peers
/// where the first entry has the lowest priority andthe last one the highest.
///
/// Rules:
/// - Trusted peers and bootstrap nodes are not removable
/// - Peers that we are currently syncing with are not removable
/// - Validators are considered higher priority than provers or clients
/// - Connections that have not been seen in a while are considered lower priority
fn get_removable_peers(&self) -> Vec<Peer<N>> {
// The trusted peers (specified at runtime)
let trusted = self.router().trusted_peers();
// The hardcoded bootstrap nodes
let bootstrap = self.router().bootstrap_peers();
// Are we synced already? (cache this here, so it does not need to be recomputed)
let is_block_synced = self.is_block_synced();

// Sort by priority, where lowest priority will be at the beginning
// of the vector.
// Note, that this gives equal priority to clients and provers, which
// we might want to change in the future
let mut peers = self.router().get_connected_peers();
peers.sort_by_key(|peer| (peer.is_validator(), peer.last_seen()));

// Deterimine which of the peers can be removed
peers
.into_iter()
.filter(|peer| {
!trusted.contains(&peer.ip()) // Always keep trusted nodes
&& !bootstrap.contains(&peer.ip()) // Always keep bootstrap nodes
&& !self.router().cache.contains_inbound_block_request(&peer.ip()) // This peer is currently syncing from us
Copy link
Contributor

@raychu86 raychu86 Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be exploited in a way where a malicious validator just needs to keep sending block requests to prevent the peer disconnects?

Copy link
Collaborator

@vicsn vicsn Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, same for malicious clients, this was already an issue before this PR.

@kaimast want to create a new issue for this? A simple solution as noted here would be to refresh a random selection of syncing peers under some conditions, I bet with further analysis we can avoid hurting sync performance of honest nodes too much.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even without this check, couldn't a malicious peer send Ping messages frequently to appear more responsive and receive higher priority?

&& (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.ip()) == 0) // We are currently syncing from this peer
})
.collect()
}

/// This function removes the peer that we have not heard from the longest,
/// to keep the connections fresh.
/// It only triggers if the router is above the minimum number of connected peers.
fn remove_oldest_connected_peer(&self) {
// Skip if the router is at or below the minimum number of connected peers.
if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
Expand All @@ -117,32 +154,15 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
return;
}

// Retrieve the trusted peers.
let trusted = self.router().trusted_peers();
// Retrieve the bootstrap peers.
let bootstrap = self.router().bootstrap_peers();

// Find the oldest connected peer, that is neither trusted nor a bootstrap peer.
let oldest_peer = self
.router()
.get_connected_peers()
.iter()
.filter(|peer| !trusted.contains(&peer.ip()) && !bootstrap.contains(&peer.ip()))
.filter(|peer| !self.router().cache.contains_inbound_block_request(&peer.ip())) // Skip if the peer is syncing.
.filter(|peer| self.is_block_synced() || self.router().cache.num_outbound_block_requests(&peer.ip()) == 0) // Skip if you are syncing from this peer.
.min_by_key(|peer| peer.last_seen())
.map(|peer| peer.ip());

// Disconnect from the oldest connected peer, if one exists.
if let Some(oldest) = oldest_peer {
if let Some(oldest) = self.get_removable_peers().pop().map(|peer| peer.ip()) {
info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
let _ = self.send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
// Disconnect from this peer.
self.router().disconnect(oldest);
}
}

/// TODO (howardwu): If the node is a validator, keep the validator.
/// This function keeps the number of connected peers within the allowed range.
fn handle_connected_peers(&self) {
// Initialize an RNG.
Expand Down Expand Up @@ -182,50 +202,36 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
let bootstrap = self.router().bootstrap_peers();

// Determine the provers to disconnect from.
let prover_ips_to_disconnect = self
let provers_to_disconnect = self
.router()
.connected_provers()
.into_iter()
.filter(|peer_ip| !trusted.contains(peer_ip) && !bootstrap.contains(peer_ip))
.filter(|peer_addr| !trusted.contains(peer_addr) && !bootstrap.contains(peer_addr))
.choose_multiple(rng, num_surplus_provers);

// TODO (howardwu): As a validator, prioritize disconnecting from clients.
// Remove RNG, pick the `n` oldest nodes.
// Determine the clients and validators to disconnect from.
let peer_ips_to_disconnect = self
.router()
.get_connected_peers()
let peers_to_disconnect = self
.get_removable_peers()
.into_iter()
.filter_map(|peer| {
let peer_ip = peer.ip();
if !peer.is_prover() && // Skip if the peer is a prover.
!trusted.contains(&peer_ip) && // Skip if the peer is trusted.
!bootstrap.contains(&peer_ip) && // Skip if the peer is a bootstrap peer.
// Skip if you are syncing from this peer.
(self.is_block_synced() || (!self.is_block_synced() && self.router().cache.num_outbound_block_requests(&peer.ip()) == 0))
{
Some(peer_ip)
} else {
None
}
})
.choose_multiple(rng, num_surplus_clients_validators);
.filter(|peer| !peer.is_prover()) // remove provers as those are handled seperately
.map(|p| p.ip())
.take(num_surplus_clients_validators);

// Proceed to send disconnect requests to these peers.
for peer_ip in peer_ips_to_disconnect.into_iter().chain(prover_ips_to_disconnect) {
for peer_addr in peers_to_disconnect.chain(provers_to_disconnect) {
// TODO (howardwu): Remove this after specializing this function.
if self.router().node_type().is_prover() {
if let Some(peer) = self.router().get_connected_peer(&peer_ip) {
if let Some(peer) = self.router().get_connected_peer(&peer_addr) {
if peer.node_type().is_validator() {
continue;
}
}
}

info!("Disconnecting from '{peer_ip}' (exceeded maximum connections)");
self.send(peer_ip, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
info!("Disconnecting from '{peer_addr}' (exceeded maximum connections)");
self.send(peer_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
// Disconnect from this peer.
self.router().disconnect(peer_ip);
self.router().disconnect(peer_addr);
Copy link
Collaborator

@niklaslong niklaslong Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think peer_ip here was correct? disconnect internally calls resolve_to_ambiguous which maps the listener to the ephemeral address (ip + port).

You might have noticed this already but just in case, the convention is:

  • peer_ip (granted, a slight misnomer): the peer's listener SocketAddr (ip + port)
  • peer_addr: the peer's ephemeral SocketAddr (ip + port, though only the port should be different from the listener)

Copy link
Collaborator

@vicsn vicsn Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this naming convention documented anywhere? If not, where would be a good place?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add some more documentation to Resolver. It should also mention that the peer_id / listen address is used to uniquely identify a peer in some parts of the code.

}
}

Expand Down Expand Up @@ -290,11 +296,11 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
/// This function attempts to connect to any disconnected trusted peers.
fn handle_trusted_peers(&self) {
// Ensure that the trusted nodes are connected.
for peer_ip in self.router().trusted_peers() {
for peer_addr in self.router().trusted_peers() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be peer_ip as iirc they are the pre-configured trusted listener addresses the nodes can connect to.

// If the peer is not connected, attempt to connect to it.
if !self.router().is_connected(peer_ip) {
// Attempt to connect to the trusted peer.
self.router().connect(*peer_ip);
if !self.router().is_connected(peer_addr) {
debug!("Attempting to (re-)connect to trusted peer `{peer_addr}`");
self.router().connect(*peer_addr);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions node/router/src/helpers/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl<N: Network> Cache<N> {
}

impl<N: Network> Cache<N> {
/// Returns `true` if the cache contains the block request for the given peer.
/// Returns `true` if the cache contains any inbound block requests for the given peer.
pub fn contains_inbound_block_request(&self, peer_ip: &SocketAddr) -> bool {
Self::retain(&self.seen_inbound_block_requests, *peer_ip, Self::INBOUND_BLOCK_REQUEST_INTERVAL) > 0
}
Expand All @@ -135,7 +135,7 @@ impl<N: Network> Cache<N> {
self.seen_outbound_block_requests.read().get(peer_ip).map(|r| r.len()).unwrap_or(0)
}

/// Returns `true` if the cache contains the block request for the given peer.
/// Returns `true` if the cache contains the given block request for the specified peer.
pub fn contains_outbound_block_request(&self, peer_ip: &SocketAddr, request: &BlockRequest) -> bool {
self.seen_outbound_block_requests.read().get(peer_ip).map(|r| r.contains(request)).unwrap_or(false)
}
Expand Down