Skip to content

Commit

Permalink
perf(consensus): Make reactor check for duplicate/old block parts co…
Browse files Browse the repository at this point in the history
  • Loading branch information
ValarDragon committed May 31, 2024
1 parent 9f44480 commit 71186a0
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 13 deletions.
13 changes: 12 additions & 1 deletion consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,18 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}

// Check if the block part is old, or we already have it. If so don't write to WAL.
cs := conR.conS
cs.mtx.RLock()
height, blockParts := cs.Height, cs.ProposalBlockParts
cs.mtx.RUnlock()

allowFutureBlockPart := true
ok := allowProcessingProposalBlockPart(msg, conR.Logger, conR.Metrics, height, blockParts, allowFutureBlockPart, e.Src.ID())
if ok {
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
Expand Down
49 changes: 37 additions & 12 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -1900,31 +1900,56 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error {
return nil
}

// NOTE: block is not necessarily valid.
// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit,
// once we have the full block.
func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (added bool, err error) {
// checks if we should allow processing the proposal block part.
// Shared code between reactor and state machine.
// This must not modify csBlockParts, only take read-only accesses to it.
// Returns true if the block part is not old or duplicated.
func allowProcessingProposalBlockPart(msg *BlockPartMessage, logger log.Logger, metrics *Metrics, csHeight int64, csBlockParts *types.PartSet, allowFutureHeights bool, peerID p2p.ID) bool {
height, round, part := msg.Height, msg.Round, msg.Part

// Blocks might be reused, so round mismatch is OK
if cs.Height != height {
cs.Logger.Debug("received block part from wrong height", "height", height, "round", round)
cs.metrics.BlockGossipPartsReceived.With("matches_current", "false").Add(1)
return false, nil
// Blocks might be reused, so round mismatch is OK. Meant for reactor, where we may get
// future block parts while the proposal for the next block is still in message queue.
if allowFutureHeights && height > csHeight {
return true
}
if csHeight != height {
logger.Debug("received block part from wrong height", "height", height, "round", round)
metrics.BlockGossipPartsReceived.With("matches_current", "false").Add(1)
return false
}

// We're not expecting a block part.
if cs.ProposalBlockParts == nil {
cs.metrics.BlockGossipPartsReceived.With("matches_current", "false").Add(1)
if csBlockParts == nil {
metrics.BlockGossipPartsReceived.With("matches_current", "false").Add(1)
// NOTE: this can happen when we've gone to a higher round and
// then receive parts from the previous round - not necessarily a bad peer.
cs.Logger.Debug(
logger.Debug(
"received a block part when we are not expecting any",
"height", height,
"round", round,
"index", part.Index,
"peer", peerID,
)
return false
}

if csBlockParts.IsComplete() || csBlockParts.GetPart(int(part.Index)) != nil {
// metrics.DuplicateBlockPart.Add(1)
return false
}

return true
}

// NOTE: block is not necessarily valid.
// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit,
// once we have the full block.
func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (added bool, err error) {
part := msg.Part
// TODO: better handle block parts for future heights, by saving them and processing them later.
allowFutureBlockPart := false
ok := allowProcessingProposalBlockPart(msg, cs.Logger, cs.metrics, cs.Height, cs.ProposalBlockParts, allowFutureBlockPart, peerID)
if !ok {
return false, nil
}

Expand Down

0 comments on commit 71186a0

Please sign in to comment.