Skip to content

Commit

Permalink
refactor: update and formalize tracing schema
Browse files Browse the repository at this point in the history
  • Loading branch information
evan-forbes committed Aug 4, 2023
1 parent 01cc646 commit 2c1251b
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 25 deletions.
30 changes: 16 additions & 14 deletions mempool/cat/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/libs/bytes"
cmtbytes "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/mempool"
Expand All @@ -31,6 +30,9 @@ const (
// peerHeightDiff signifies the tolerance in difference in height between the peer and the height
// the node received the tx
peerHeightDiff = 10

// TracingTag is the tracing tag for the cat pool
TracingTag = "cat"
)

// Reactor handles mempool tx broadcasting logic amongst peers. For the main
Expand Down Expand Up @@ -211,11 +213,11 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// flooded the network with transactions.
case *protomem.Txs:
for _, tx := range msg.Txs {
memR.evCollector.WritePoint("mempool", "cat", map[string]interface{}{
"receive_tx": bytes.HexBytes(types.Tx(tx).Hash()).String(),
"peer": e.Src.ID(),
"size": len(tx),
})
memR.evCollector.WritePoint(
mempool.MeasurementTracingTag,
TracingTag,
mempool.TxTracingPoint(mempool.ReceiveTracingFieldValue, e.Src.ID(), tx),
)
}
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
Expand Down Expand Up @@ -259,8 +261,8 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// 3. If we recently evicted the tx and still don't have space for it, we do nothing.
// 4. Else, we request the transaction from that peer.
case *protomem.SeenTx:
memR.evCollector.WritePoint("mempool", "cat", map[string]interface{}{
"receive_seen_tx": cmtbytes.HexBytes(msg.TxKey).String(),
memR.evCollector.WritePoint(mempool.MeasurementTracingTag, TracingTag, map[string]interface{}{
"seen_tx": cmtbytes.HexBytes(msg.TxKey).String(),
})
txKey, err := types.TxKeyFromBytes(msg.TxKey)
if err != nil {
Expand Down Expand Up @@ -289,7 +291,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// A peer is requesting a transaction that we have claimed to have. Find the specified
// transaction and broadcast it to the peer. We may no longer have the transaction
case *protomem.WantTx:
memR.evCollector.WritePoint("mempool", "cat", map[string]interface{}{
memR.evCollector.WritePoint(mempool.MeasurementTracingTag, TracingTag, map[string]interface{}{
"want_tx": cmtbytes.HexBytes(msg.TxKey).String(),
})
txKey, err := types.TxKeyFromBytes(msg.TxKey)
Expand All @@ -301,11 +303,11 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
tx, has := memR.mempool.Get(txKey)
if has && !memR.opts.ListenOnly {
peerID := memR.ids.GetIDForPeer(e.Src.ID())
memR.evCollector.WritePoint("mempool", "cat", map[string]interface{}{
"broadcast_tx": bytes.HexBytes(tx.Hash()).String(),
"peer": peerID,
"size": len(tx),
})
memR.evCollector.WritePoint(
mempool.MeasurementTracingTag,
TracingTag,
mempool.TxTracingPoint(mempool.SendTracingFieldValue, e.Src.ID(), tx),
)
memR.Logger.Debug("sending a tx in response to a want msg", "peer", peerID)
if p2p.SendEnvelopeShim(e.Src, p2p.Envelope{ //nolint:staticcheck
ChannelID: mempool.MempoolChannel,
Expand Down
51 changes: 51 additions & 0 deletions mempool/tracing_schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package mempool

import (
"github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)

const (
// MeasurementTracingTag is the tracing tag for the mempool
MeasurementTracingTag = "mempool"

// TxTracingFieldKey is the tracing field key for receiving for sending a
// tx. This should take the form of a tx hash as the value.
TxTracingFieldKey = "tx"

// SizeTracingFieldKey is the tracing field key for the size of a tx. This
// should take the form of the size of the tx as the value.
SizeTracingFieldKey = "size"

// PeerTracingFieldKey is the tracing field key for the peer that sent or
// received a tx. This should take the form of the peer's address as the
// value.
PeerTracingFieldKey = "peer"

// ClassTracingFieldKey is the tracing field key for the class of a tx. This should use either the send or receive value.
ClassTracingFieldKey = "class"

// ReceiveTracingFieldValue is the tracing field value for receiving some
// data from a peer. This value is used by the "class" field key.
ReceiveTracingFieldValue = "receive"

// SendTracingFieldValue is the tracing field value for sending some data
// to a peer. This value is used by the "class" field key.
SendTracingFieldValue = "send"
)

// TxTracingPoint returns a tracing point for a tx using the predetermined
// schema for mempool tracing. This can be used for either receiving or sending
// a tx (change the class).
//
// This is used to create a table in the following schema:
// | time | peerID | class (receiving or sending) | tx size | tx hash |
func TxTracingPoint(class string, peer p2p.ID, tx []byte) map[string]interface{} {
return map[string]interface{}{
TxTracingFieldKey: bytes.HexBytes(types.Tx(tx).Hash()).String(),
PeerTracingFieldKey: peer,
SizeTracingFieldKey: len(tx),
ClassTracingFieldKey: class,
}
}
26 changes: 15 additions & 11 deletions mempool/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/gogo/protobuf/proto"

cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
cmtsync "github.com/tendermint/tendermint/libs/sync"
Expand All @@ -19,6 +18,11 @@ import (
"github.com/tendermint/tendermint/types"
)

const (
// TracingTag is the tag used for tracing the v1 mempool reactor.
TracingTag = "v1"
)

// Reactor handles mempool tx broadcasting amongst peers.
// It maintains a map from peer ID to counter, to prevent gossiping txs to the
// peers you received it from.
Expand Down Expand Up @@ -165,11 +169,11 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
switch msg := e.Message.(type) {
case *protomem.Txs:
for _, tx := range msg.Txs {
memR.evCollector.WritePoint("mempool", "v1", map[string]interface{}{
"receive_tx": bytes.HexBytes(types.Tx(tx).Hash()).String(),
"peer": e.Src.ID(),
"size": len(tx),
})
memR.evCollector.WritePoint(
mempool.MeasurementTracingTag,
TracingTag,
mempool.TxTracingPoint(mempool.ReceiveTracingFieldValue, e.Src.ID(), tx),
)
}
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
Expand Down Expand Up @@ -273,11 +277,11 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// NOTE: Transaction batching was disabled due to
// https://github.com/tendermint/tendermint/issues/5796
if !memTx.HasPeer(peerID) {
memR.evCollector.WritePoint("mempool", "v1", map[string]interface{}{
"broadcast_tx": bytes.HexBytes(memTx.tx.Hash()).String(),
"peer": peerID,
"size": len(memTx.tx),
})
memR.evCollector.WritePoint(
mempool.MeasurementTracingTag,
TracingTag,
mempool.TxTracingPoint(mempool.SendTracingFieldValue, peer.ID(), memTx.tx),
)
success := p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: mempool.MempoolChannel,
Message: &protomem.Txs{Txs: [][]byte{memTx.tx}},
Expand Down

0 comments on commit 2c1251b

Please sign in to comment.