Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metrics to network connector #265

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 159 additions & 3 deletions src/consensus/malachite/network_connector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::core::types::SnapchainValidatorContext;
use crate::network::gossip::GossipEvent;
use crate::utils::statsd_wrapper::StatsdClientWrapper;
use async_trait::async_trait;
use informalsystems_malachitebft_core_consensus::SignedConsensusMsg;
use informalsystems_malachitebft_engine::consensus::ConsensusCodec;
Expand All @@ -13,7 +14,7 @@ use libp2p::request_response;
use ractor::{Actor, ActorProcessingErr, ActorRef, OutputPort};
use std::collections::HashMap;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error, trace};
use tracing::{debug, error, info, trace};

pub type MalachiteNetworkActorMsg = Msg<SnapchainValidatorContext>;
pub type MalachiteNetworkEvent = Event;
Expand All @@ -27,11 +28,45 @@ pub struct NetworkConnectorState {
output_port: OutputPort<NetworkEvent<SnapchainValidatorContext>>,
inbound_requests: HashMap<sync::InboundRequestId, request_response::InboundRequestId>,
gossip_tx: mpsc::Sender<GossipEvent<SnapchainValidatorContext>>,
statsd_client: StatsdClientWrapper,
shard_id: u32,
}

fn count(
statsd_client: &StatsdClientWrapper,
shard_id: u32,
key: &str,
value: u64,
tags: Vec<(String, String)>,
) {
statsd_client.count_with_shard(
shard_id,
format!("network_connector.{}", key).as_str(),
value,
tags,
);
}

fn gauge(
statsd_client: &StatsdClientWrapper,
shard_id: u32,
key: &str,
value: u64,
tags: Vec<(String, String)>,
) {
statsd_client.gauge_with_shard(
shard_id,
format!("network_connector.{}", key).as_str(),
value,
tags,
);
}

pub struct NetworkConnectorArgs {
pub gossip_tx: mpsc::Sender<GossipEvent<SnapchainValidatorContext>>,
pub peer_id: MalachitePeerId,
pub statsd_client: StatsdClientWrapper,
pub shard_id: u32,
}

impl<Codec> MalachiteNetworkConnector<Codec>
Expand Down Expand Up @@ -70,6 +105,8 @@ where
gossip_tx: args.gossip_tx.clone(),
inbound_requests: HashMap::new(),
peer_id: args.peer_id,
statsd_client: args.statsd_client,
shard_id: args.shard_id,
})
}

Expand All @@ -92,6 +129,8 @@ where
gossip_tx,
inbound_requests,
peer_id: _,
statsd_client,
shard_id,
} = state;

match msg {
Expand All @@ -101,11 +140,27 @@ where

Msg::Publish(msg) => match msg {
SignedConsensusMsg::Vote(vote) => {
count(statsd_client, *shard_id, "send_vote", 1, vec![]);
gauge(
statsd_client,
*shard_id,
"send_vote_height",
vote.height.block_number,
vec![],
);
gossip_tx
.send(GossipEvent::BroadcastSignedVote(vote))
.await?;
}
SignedConsensusMsg::Proposal(proposal) => {
count(statsd_client, *shard_id, "send_proposal", 1, vec![]);
gauge(
statsd_client,
*shard_id,
"send_proposal_height",
proposal.height.block_number,
vec![],
);
gossip_tx
.send(GossipEvent::BroadcastSignedProposal(proposal))
.await?;
Expand All @@ -114,6 +169,14 @@ where

Msg::PublishProposalPart(msg) => {
if let Some(full_proposal) = msg.content.as_data() {
count(statsd_client, *shard_id, "send_proposal_part", 1, vec![]);
gauge(
statsd_client,
*shard_id,
"send_proposal_part_height",
full_proposal.height().block_number,
vec![],
);
gossip_tx
.send(GossipEvent::BroadcastFullProposal(full_proposal.clone()))
.await?;
Expand All @@ -123,16 +186,32 @@ where
}

Msg::BroadcastStatus(status) => {
let height = status.height;
let status = sync::Status {
peer_id: state.peer_id,
height: status.height,
height,
history_min_height: status.history_min_height,
};
count(statsd_client, *shard_id, "send_status", 1, vec![]);
gauge(
statsd_client,
*shard_id,
"broadcast_status_height",
height.block_number,
vec![],
);
gossip_tx.send(GossipEvent::BroadcastStatus(status)).await?
}

Msg::OutgoingRequest(peer_id, request, reply_to) => {
let (tx, rx) = oneshot::channel();
count(
statsd_client,
*shard_id,
"send_sync_request",
1,
vec![("peer_id".to_string(), peer_id.to_string())],
);
gossip_tx
.send(GossipEvent::SyncRequest(peer_id, request, tx))
.await?;
Expand All @@ -143,6 +222,7 @@ where
Msg::OutgoingResponse(request_id, response) => {
let request_id = inbound_requests.remove(&request_id);
if let Some(request_id) = request_id {
count(statsd_client, *shard_id, "send_sync_reply", 1, vec![]);
gossip_tx
.send(GossipEvent::SyncReply(request_id, response))
.await?;
Expand All @@ -157,11 +237,27 @@ where

Msg::NewEvent(Event::PeerConnected(peer_id)) => {
// peers.insert(peer_id);
count(
statsd_client,
*shard_id,
"recv_peer_connected",
1,
vec![("peer_id".to_string(), peer_id.to_string())],
);
info!(peer_id = peer_id.to_string(), shard_id, "Peer connected");
output_port.send(NetworkEvent::PeerConnected(peer_id));
}

Msg::NewEvent(Event::PeerDisconnected(peer_id)) => {
// peers.remove(&peer_id);
count(
statsd_client,
*shard_id,
"recv_peer_disconnected",
1,
vec![("peer_id".to_string(), peer_id.to_string())],
);
info!(peer_id = peer_id.to_string(), shard_id, "Peer disconnected");
output_port.send(NetworkEvent::PeerDisconnected(peer_id));
}

Expand All @@ -175,8 +271,39 @@ where
};

let event = match msg {
SignedConsensusMsg::Vote(vote) => NetworkEvent::Vote(from, vote),
SignedConsensusMsg::Vote(vote) => {
count(
statsd_client,
*shard_id,
"recv_vote",
1,
vec![("peer_id".to_string(), from.to_string())],
);
gauge(
statsd_client,
*shard_id,
"recv_vote_height",
vote.height.block_number,
vec![("peer_id".to_string(), from.to_string())],
);
NetworkEvent::Vote(from, vote)
}

SignedConsensusMsg::Proposal(proposal) => {
count(
statsd_client,
*shard_id,
"recv_proposal",
1,
vec![("peer_id".to_string(), from.to_string())],
);
gauge(
statsd_client,
*shard_id,
"recv_proposal_height",
proposal.height.block_number,
vec![("peer_id".to_string(), from.to_string())],
);
debug!("Received proposal from network");
NetworkEvent::Proposal(from, proposal)
}
Expand All @@ -187,6 +314,7 @@ where

Msg::NewEvent(Event::Message(Channel::ProposalParts, from, data)) => {
debug!("Received proposal parts from network");
count(statsd_client, *shard_id, "recv_proposal_parts", 1, vec![]);
let msg: StreamMessage<<SnapchainValidatorContext as informalsystems_malachitebft_core_types::Context>::ProposalPart> = match self.codec.decode(data) {
Ok(stream_msg) => stream_msg,
Err(e) => {
Expand Down Expand Up @@ -214,6 +342,20 @@ where
return Ok(());
}
};
count(
statsd_client,
*shard_id,
"recv_peer_status",
1,
vec![("peer_id".to_string(), from.to_string())],
);
gauge(
statsd_client,
*shard_id,
"peer_height",
status.height.block_number,
vec![("peer_id".to_string(), status.peer_id.to_string())],
);

// We don't need this check because we're using gossip and not broadcast
// if from != status.peer_id {
Expand Down Expand Up @@ -243,6 +385,13 @@ where
return Ok(());
}
};
count(
statsd_client,
*shard_id,
"recv_sync_request",
1,
vec![("peer_id".to_string(), peer.to_string())],
);

inbound_requests.insert(sync::InboundRequestId::new(request_id), request_id);

Expand All @@ -266,6 +415,13 @@ where
return Ok(());
}
};
count(
statsd_client,
*shard_id,
"recv_sync_response",
1,
vec![("peer_id".to_string(), peer.to_string())],
);

output_port.send(NetworkEvent::Response(
sync::OutboundRequestId::new(request_id),
Expand Down
9 changes: 8 additions & 1 deletion src/consensus/malachite/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::consensus::malachite::snapchain_codec::SnapchainCodec;
use crate::consensus::validator::ShardValidator;
use crate::core::types::{ShardId, SnapchainValidatorContext};
use crate::network::gossip::GossipEvent;
use crate::utils::statsd_wrapper::StatsdClientWrapper;
use informalsystems_malachitebft_engine::sync::{Params as SyncParams, Sync, SyncRef};
use informalsystems_malachitebft_engine::util::events::TxEvent;
use informalsystems_malachitebft_engine::wal::{Wal, WalRef};
Expand All @@ -27,11 +28,15 @@ use tokio::sync::mpsc;
pub async fn spawn_network_actor(
gossip_tx: mpsc::Sender<GossipEvent<SnapchainValidatorContext>>,
local_peer_id: PeerId,
statsd_client: StatsdClientWrapper,
shard_id: u32,
) -> Result<NetworkRef<SnapchainValidatorContext>, ractor::SpawnErr> {
let codec = SnapchainCodec;
let args = NetworkConnectorArgs {
gossip_tx,
peer_id: MalachitePeerId::from_libp2p(&local_peer_id),
statsd_client,
shard_id,
};
MalachiteNetworkConnector::spawn(codec, args)
.await
Expand Down Expand Up @@ -145,6 +150,7 @@ impl MalachiteConsensusActors {
gossip_tx: mpsc::Sender<GossipEvent<SnapchainValidatorContext>>,
registry: &SharedRegistry,
consensus_start_delay: u32,
statsd_client: StatsdClientWrapper,
) -> Result<Self, ractor::SpawnErr> {
let current_height = shard_validator.get_current_height();
let validator_set = shard_validator.get_validator_set();
Expand All @@ -157,7 +163,8 @@ impl MalachiteConsensusActors {
};
let span = tracing::info_span!("node", name = %name);

let network_actor = spawn_network_actor(gossip_tx.clone(), local_peer_id).await?;
let network_actor =
spawn_network_actor(gossip_tx.clone(), local_peer_id, statsd_client, shard_id).await?;
let wal_actor = spawn_wal_actor(
Path::new(format!("{}/shard-{}/wal", db_dir, shard_id).as_str()),
ctx.clone(),
Expand Down
3 changes: 3 additions & 0 deletions src/consensus/proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl Proposer for ShardProposer {
self.shard_id.shard_id(),
"proposer.pending_blocks",
self.proposed_chunks.len() as u64,
vec![],
);
}

Expand Down Expand Up @@ -470,11 +471,13 @@ impl Proposer for BlockProposer {
self.shard_id.shard_id(),
"proposer.pending_shards",
self.pending_chunks.len() as u64,
vec![],
);
self.statsd_client.gauge_with_shard(
self.shard_id.shard_id(),
"proposer.pending_blocks",
self.proposed_blocks.len() as u64,
vec![],
);
}

Expand Down
9 changes: 5 additions & 4 deletions src/mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,20 +276,21 @@ impl Mempool {
messages.insert(message.mempool_key(), message.clone());
self.messages.insert(shard_id, messages);
self.statsd_client
.gauge_with_shard(shard_id, "mempool.size", 1);
.gauge_with_shard(shard_id, "mempool.size", 1, vec![]);
}
Some(messages) => {
messages.insert(message.mempool_key(), message.clone());
self.statsd_client.gauge_with_shard(
shard_id,
"mempool.size",
messages.len() as u64,
vec![],
);
}
}

self.statsd_client
.count_with_shard(shard_id, "mempool.insert.success", 1);
.count_with_shard(shard_id, "mempool.insert.success", 1, vec![]);

match message {
MempoolMessage::UserMessage(_) => {
Expand Down Expand Up @@ -332,11 +333,11 @@ impl Mempool {
for transaction in chunk.transactions {
for user_message in transaction.user_messages {
mempool.remove(&user_message.mempool_key());
self.statsd_client.count_with_shard(height.shard_index, "mempool.remove.success", 1);
self.statsd_client.count_with_shard(height.shard_index, "mempool.remove.success", 1, vec![]);
}
for system_message in transaction.system_messages {
mempool.remove(&system_message.mempool_key());
self.statsd_client.count_with_shard(height.shard_index, "mempool.remove.success", 1);
self.statsd_client.count_with_shard(height.shard_index, "mempool.remove.success", 1, vec![]);
}
}
}
Expand Down
Loading