From cd0ff6893be243155148f1799a154b7253ea3bb1 Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 15 Jul 2024 09:04:39 +0200 Subject: [PATCH 1/4] feat(p2p): render `HasChannel(chID)` is a public `p2p.Peer` method (#3510) Closes: #3472 It also prevents reactors from starting routines intended to send messages to a peer that does not implement/support a given channel. Because all `Send()` or `TrySend()` calls from this routine will be useless, always returning `false` and possibly producing some busy-wait behavior (see https://github.com/cometbft/cometbft/issues/3414). The changes are restricted to: mempool and evidence reactor, because they use a single channel and have a sending routine peer peer, and two of the consensus routines, for Data and Votes. Block and State sync reactors have smarter ways to deal with unresponsive peers, so probably this check is not needed. --- - [x] Tests written/updated - [x] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog) - [x] Updated relevant documentation (`docs/` or `spec/`) and code comments - [x] Title follows the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec --- .../features/3472-p2p-has-channel-api.md | 3 +++ consensus/reactor.go | 8 ++++++ evidence/reactor.go | 4 ++- evidence/reactor_test.go | 1 + mempool/v0/reactor.go | 2 +- p2p/mock/peer.go | 1 + p2p/mocks/peer.go | 18 +++++++++++++ p2p/peer.go | 24 ++++++++--------- p2p/peer_set_test.go | 3 ++- spec/p2p/reactor-api/p2p-api.md | 26 ++++++++++++------- 10 files changed, 65 insertions(+), 25 deletions(-) create mode 100644 .changelog/unreleased/features/3472-p2p-has-channel-api.md diff --git a/.changelog/unreleased/features/3472-p2p-has-channel-api.md b/.changelog/unreleased/features/3472-p2p-has-channel-api.md new file mode 100644 index 00000000000..b554a29ce1d --- /dev/null +++ b/.changelog/unreleased/features/3472-p2p-has-channel-api.md @@ -0,0 +1,3 @@ +- `[p2p]` `HasChannel(chID)` method added to the `Peer` interface, used by + reactors to check whether a peer implements/supports a given channel. + ([#3472](https://github.com/cometbft/cometbft/issues/3472)) diff --git a/consensus/reactor.go b/consensus/reactor.go index 3c8370f97ce..931a7cb9f94 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -583,6 +583,10 @@ func (conR *Reactor) getRoundState() *cstypes.RoundState { func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) + if !peer.HasChannel(DataChannel) { + logger.Info("Peer does not implement DataChannel.") + return + } rng := cmtrand.NewStdlibRand() OUTER_LOOP: @@ -743,6 +747,10 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) + if !peer.HasChannel(VoteChannel) { + logger.Info("Peer does not implement VoteChannel.") + return + } rng := cmtrand.NewStdlibRand() // Simple hack to throttle logs upon sleep. diff --git a/evidence/reactor.go b/evidence/reactor.go index 677641ffb54..2cb5c8a42ec 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -63,7 +63,9 @@ func (evR *Reactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. func (evR *Reactor) AddPeer(peer p2p.Peer) { - go evR.broadcastEvidenceRoutine(peer) + if peer.HasChannel(EvidenceChannel) { + go evR.broadcastEvidenceRoutine(peer) + } } // Receive implements Reactor. diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index 25aaa147931..ca55cf2b6f3 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -212,6 +212,7 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) { e, ok := i.(p2p.Envelope) return ok && e.ChannelID == evidence.EvidenceChannel })).Return(false) + p.On("HasChannel", evidence.EvidenceChannel).Maybe().Return(true) quitChan := make(<-chan struct{}) p.On("Quit").Return(quitChan) ps := peerState{2} diff --git a/mempool/v0/reactor.go b/mempool/v0/reactor.go index d3dc4a0dd38..d939419d1c6 100644 --- a/mempool/v0/reactor.go +++ b/mempool/v0/reactor.go @@ -161,7 +161,7 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. // It starts a broadcast routine ensuring all txs are forwarded to the given peer. func (memR *Reactor) AddPeer(peer p2p.Peer) { - if memR.config.Broadcast { + if memR.config.Broadcast && peer.HasChannel(mempool.MempoolChannel) { go func() { // Always forward transactions to unconditional peers. if !memR.Switch.IsPeerUnconditional(peer.ID()) { diff --git a/p2p/mock/peer.go b/p2p/mock/peer.go index e34f383f34a..867309c2264 100644 --- a/p2p/mock/peer.go +++ b/p2p/mock/peer.go @@ -43,6 +43,7 @@ func NewPeer(ip net.IP) *Peer { } func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error +func (mp *Peer) HasChannel(_ byte) bool { return true } func (mp *Peer) TrySendEnvelope(e p2p.Envelope) bool { return true } func (mp *Peer) TrySendMarshalled(e p2p.MarshalledEnvelope) bool { return true } func (mp *Peer) SendEnvelope(e p2p.Envelope) bool { return true } diff --git a/p2p/mocks/peer.go b/p2p/mocks/peer.go index 7327e7ada20..93560706cef 100644 --- a/p2p/mocks/peer.go +++ b/p2p/mocks/peer.go @@ -67,6 +67,24 @@ func (_m *Peer) GetRemovalFailed() bool { return r0 } +// HasChannel provides a mock function with given fields: chID +func (_m *Peer) HasChannel(chID byte) bool { + ret := _m.Called(chID) + + if len(ret) == 0 { + panic("no return value specified for HasChannel") + } + + var r0 bool + if rf, ok := ret.Get(0).(func(byte) bool); ok { + r0 = rf(chID) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + // ID provides a mock function with given fields: func (_m *Peer) ID() p2p.ID { ret := _m.Called() diff --git a/p2p/peer.go b/p2p/peer.go index 51ea82b9049..bbf66766608 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -36,6 +36,7 @@ type Peer interface { Status() cmtconn.ConnectionStatus SocketAddr() *NetAddress // actual address of the socket + HasChannel(chID byte) bool // Does the peer implement this channel? SendEnvelope(Envelope) bool TrySendEnvelope(Envelope) bool TrySendMarshalled(MarshalledEnvelope) bool @@ -114,7 +115,7 @@ type peer struct { // peer's node info and the channel it knows about // channels = nodeInfo.Channels - // cached to avoid copying nodeInfo in hasChannel + // cached to avoid copying nodeInfo in HasChannel nodeInfo NodeInfo channels []byte @@ -288,7 +289,7 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo func (p *peer) sendMarshalled(chID byte, msgType reflect.Type, msgBytes []byte, sendFunc func(byte, []byte) bool) bool { if !p.IsRunning() { return false - } else if !p.hasChannel(chID) { + } else if !p.HasChannel(chID) { return false } res := sendFunc(chID, msgBytes) @@ -308,9 +309,8 @@ func (p *peer) Set(key string, data interface{}) { p.Data.Set(key, data) } -// hasChannel returns true if the peer reported -// knowing about the given chID. -func (p *peer) hasChannel(chID byte) bool { +// HasChannel returns whether the peer reported implementing this channel. +func (p *peer) HasChannel(chID byte) bool { for _, ch := range p.channels { if ch == chID { return true @@ -318,13 +318,13 @@ func (p *peer) hasChannel(chID byte) bool { } // NOTE: probably will want to remove this // but could be helpful while the feature is new - // p.Logger.Debug( - // "Unknown channel for peer", - // "channel", - // chID, - // "channels", - // p.channels, - // ) + p.Logger.Debug( + "Unknown channel for peer", + "channel", + chID, + "channels", + p.channels, + ) return false } diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index 7a2554426cf..7a59f1f3219 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -18,7 +18,8 @@ type mockPeer struct { id ID } -func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error +func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore erro +func (mp *mockPeer) HasChannel(byte) bool { return true } func (mp *mockPeer) TrySendMarshalled(e MarshalledEnvelope) bool { return true } func (mp *mockPeer) TrySendEnvelope(e Envelope) bool { return true } diff --git a/spec/p2p/reactor-api/p2p-api.md b/spec/p2p/reactor-api/p2p-api.md index 927e416c72b..550ba830567 100644 --- a/spec/p2p/reactor-api/p2p-api.md +++ b/spec/p2p/reactor-api/p2p-api.md @@ -181,15 +181,16 @@ From this point, reactors can use the methods of the new `Peer` instance. The table below summarizes the interaction of the standard reactors with connected peers, with the `Peer` methods used by them: -| `Peer` API method | consensus | block sync | state sync | mempool | evidence | PEX | -|--------------------------------------------|-----------|------------|------------|---------|-----------|-------| -| `ID() ID` | x | x | x | x | x | x | -| `IsRunning() bool` | x | | | x | x | | -| `Quit() <-chan struct{}` | | | | x | x | | -| `Get(string) interface{}` | x | | | x | x | | -| `Set(string, interface{})` | x | | | | | | -| `Send(Envelope) bool` | x | x | x | x | x | x | -| `TrySend(Envelope) bool` | x | x | | | | | +| `Peer` API method | consensus | block sync | state sync | mempool | evidence | PEX | +|----------------------------|-----------|------------|------------|---------|----------|-----| +| `ID() ID` | x | x | x | x | x | x | +| `IsRunning() bool` | x | | | x | x | | +| `Quit() <-chan struct{}` | | | | x | x | | +| `Get(string) interface{}` | x | | | x | x | | +| `Set(string, interface{})` | x | | | | | | +| `HasChannel(byte) bool` | x | | | x | x | | +| `Send(Envelope) bool` | x | x | x | x | x | x | +| `TrySend(Envelope) bool` | x | x | | | | | The above list is not exhaustive as it does not include all the `Peer` methods invoked by the PEX reactor, a special component that should be considered part @@ -265,8 +266,10 @@ Finally, a `Peer` instance allows a reactor to send messages to companion reactors running at that peer. This is ultimately the goal of the switch when it provides `Peer` instances to the registered reactors. -There are two methods for sending messages: +There are two methods for sending messages, and one auxiliary method to check +whether the peer supports a given channel: + func (p Peer) HasChannel(chID byte) bool func (p Peer) Send(e Envelope) bool func (p Peer) TrySend(e Envelope) bool @@ -275,6 +278,9 @@ set as follows: - `ChannelID`: the channel the message should be sent through, which defines the reactor that will process the message; + - The auxiliary `HasChannel()` method allows testing whether the remote peer + implements a channel; if it does not, both message-sending methods will + immediately return `false`, as sending always fails. - `Src`: this field represents the source of an incoming message, which is irrelevant for outgoing messages; - `Message`: the actual message's payload, which is marshalled using protocol buffers. From d3853df3354b65872ed8a3c5aaef39f2f1d0226b Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Tue, 23 Jul 2024 13:51:48 +0200 Subject: [PATCH 2/4] One more test fix --- test/fuzz/p2p/pex/reactor_receive.go | 30 +++++++++++++++------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/test/fuzz/p2p/pex/reactor_receive.go b/test/fuzz/p2p/pex/reactor_receive.go index 55831b2384b..7dc5464d0d5 100644 --- a/test/fuzz/p2p/pex/reactor_receive.go +++ b/test/fuzz/p2p/pex/reactor_receive.go @@ -85,17 +85,19 @@ func (fp *fuzzPeer) RemoteIP() net.IP { return net.IPv4(0, 0, 0, 0) } func (fp *fuzzPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: fp.RemoteIP(), Port: 98991, Zone: ""} } -func (fp *fuzzPeer) IsOutbound() bool { return false } -func (fp *fuzzPeer) IsPersistent() bool { return false } -func (fp *fuzzPeer) CloseConn() error { return nil } -func (fp *fuzzPeer) NodeInfo() p2p.NodeInfo { return defaultNodeInfo } -func (fp *fuzzPeer) Status() p2p.ConnectionStatus { var cs p2p.ConnectionStatus; return cs } -func (fp *fuzzPeer) SocketAddr() *p2p.NetAddress { return p2p.NewNetAddress(fp.ID(), fp.RemoteAddr()) } -func (fp *fuzzPeer) SendEnvelope(e p2p.Envelope) bool { return true } -func (fp *fuzzPeer) TrySendEnvelope(e p2p.Envelope) bool { return true } -func (fp *fuzzPeer) Send(_ byte, _ []byte) bool { return true } -func (fp *fuzzPeer) TrySend(_ byte, _ []byte) bool { return true } -func (fp *fuzzPeer) Set(key string, value interface{}) { fp.m[key] = value } -func (fp *fuzzPeer) Get(key string) interface{} { return fp.m[key] } -func (fp *fuzzPeer) GetRemovalFailed() bool { return false } -func (fp *fuzzPeer) SetRemovalFailed() {} +func (fp *fuzzPeer) IsOutbound() bool { return false } +func (fp *fuzzPeer) IsPersistent() bool { return false } +func (fp *fuzzPeer) CloseConn() error { return nil } +func (fp *fuzzPeer) NodeInfo() p2p.NodeInfo { return defaultNodeInfo } +func (fp *fuzzPeer) Status() p2p.ConnectionStatus { var cs p2p.ConnectionStatus; return cs } +func (fp *fuzzPeer) SocketAddr() *p2p.NetAddress { return p2p.NewNetAddress(fp.ID(), fp.RemoteAddr()) } +func (fp *fuzzPeer) HasChannel(byte) bool { return true } +func (fp *fuzzPeer) SendEnvelope(e p2p.Envelope) bool { return true } +func (fp *fuzzPeer) TrySendEnvelope(e p2p.Envelope) bool { return true } +func (fp *fuzzPeer) TrySendMarshalled(e p2p.MarshalledEnvelope) bool { return true } +func (fp *fuzzPeer) Send(_ byte, _ []byte) bool { return true } +func (fp *fuzzPeer) TrySend(_ byte, _ []byte) bool { return true } +func (fp *fuzzPeer) Set(key string, value interface{}) { fp.m[key] = value } +func (fp *fuzzPeer) Get(key string) interface{} { return fp.m[key] } +func (fp *fuzzPeer) GetRemovalFailed() bool { return false } +func (fp *fuzzPeer) SetRemovalFailed() {} From 46db158c57fab2152c2fe03cae3ce939016dc8a2 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Wed, 24 Jul 2024 12:22:06 +0200 Subject: [PATCH 3/4] Add changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 15445c688e1..c386be66bb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Unreleased (I think) +* [#128](https://github.com/osmosis-labs/cometbft/pull/128) feat(p2p): render HasChannel(chID) is a public p2p.Peer method (#3510) * [#123](https://github.com/osmosis-labs/cometbft/pull/123) perf(p2p/conn): Remove unneeded global pool buffers in secret connection #3403 * perf(p2p): Delete expensive debug log already slated for deletion #3412 * perf(p2p): Reduce the p2p metrics overhead. #3411 From 4954580d8e07d0fc770a6ba374e2275450c98d0f Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Wed, 24 Jul 2024 12:25:18 +0200 Subject: [PATCH 4/4] Update changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c386be66bb8..5bd079a879e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ ## Unreleased (I think) * [#128](https://github.com/osmosis-labs/cometbft/pull/128) feat(p2p): render HasChannel(chID) is a public p2p.Peer method (#3510) +* [#126]() Remove p2p allocations for wrapping outbound packets +* [#125]() Fix marshalling and concurrency overhead within broadcast routines +* perf(p2p): Only update send monitor once per batch packet msg send (#3382) +* [#124]() Secret connection read buffer * [#123](https://github.com/osmosis-labs/cometbft/pull/123) perf(p2p/conn): Remove unneeded global pool buffers in secret connection #3403 * perf(p2p): Delete expensive debug log already slated for deletion #3412 * perf(p2p): Reduce the p2p metrics overhead. #3411