Skip to content

Commit

Permalink
Support out of order message reception
Browse files Browse the repository at this point in the history
Signed-off-by: Yacov Manevich <[email protected]>
  • Loading branch information
yacovm committed Jan 16, 2025
1 parent efab9f8 commit a52af73
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 32 deletions.
96 changes: 69 additions & 27 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func (e *Epoch) HandleMessage(msg *Message, from NodeID) error {
return nil
}

err1 := e.maybeLoadFutureMessages(e.round)
var err2 error

switch {
Expand All @@ -122,9 +121,11 @@ func (e *Epoch) HandleMessage(msg *Message, from NodeID) error {
err2 = e.handleFinalizationCertificateMessage(msg.FinalizationCertificate, from)
default:
e.Logger.Warn("Invalid message type", zap.Stringer("from", from))
return err1
return nil
}

err1 := e.maybeLoadFutureMessages(e.round)

return errors.Join(err1, err2)

}
Expand Down Expand Up @@ -456,6 +457,8 @@ func (e *Epoch) handleFinalizationMessage(message *Finalization, from NodeID) er
return nil
}

defer e.deleteFutureFinalization(from, finalization.Round)

if round.fCert != nil {
e.Logger.Debug("Received finalization for an already finalized round", zap.Uint64("round", finalization.Round))
return nil
Expand Down Expand Up @@ -497,7 +500,7 @@ func (e *Epoch) handleVoteMessage(message *Vote, from NodeID) error {
// yet we may receive the corresponding vote.
// This may happen if we're asynchronously verifying the proposal at the moment.
if _, exists := e.rounds[vote.Round]; !exists && e.round == vote.Round {
e.Logger.Debug("Received a finalization the current round",
e.Logger.Debug("Received a vote for the current round",
zap.Uint64("round", vote.Round), zap.Stringer("from", from))
e.storeFutureVote(message, from, vote.Round)
return nil
Expand All @@ -506,7 +509,8 @@ func (e *Epoch) handleVoteMessage(message *Vote, from NodeID) error {
// This vote may correspond to a proposal from a future round, or to the proposal of the current round
// which we are still verifying.
if e.round < vote.Round && vote.Round-e.round < e.maxRoundWindow {
e.Logger.Debug("Got vote from a future round", zap.Uint64("round", vote.Round), zap.Uint64("my round", e.round))
e.Logger.Debug("Got vote from a future round",
zap.Uint64("round", vote.Round), zap.Uint64("my round", e.round), zap.Stringer("from", from))
e.storeFutureVote(message, from, vote.Round)
return nil
}
Expand All @@ -523,6 +527,8 @@ func (e *Epoch) handleVoteMessage(message *Vote, from NodeID) error {
return nil
}

defer e.deleteFutureVote(from, vote.Round)

if !e.isVoteValid(vote) {
return nil
}
Expand Down Expand Up @@ -550,6 +556,22 @@ func (e *Epoch) storeFutureVote(message *Vote, from NodeID, round uint64) {
msgsForRound.vote = message
}

func (e *Epoch) deleteFutureVote(from NodeID, round uint64) {
msgsForRound, exists := e.futureMessages[string(from)][round]
if !exists {
return
}
msgsForRound.vote = nil
}

func (e *Epoch) deleteFutureFinalization(from NodeID, round uint64) {
msgsForRound, exists := e.futureMessages[string(from)][round]
if !exists {
return
}
msgsForRound.finalization = nil
}

func (e *Epoch) isFinalizationValid(signature []byte, finalization ToBeSignedFinalization, from NodeID) bool {
if err := finalization.Verify(signature, e.Verifier, from); err != nil {
e.Logger.Debug("Received a finalization with an invalid signature", zap.Uint64("round", finalization.Round), zap.Error(err))
Expand Down Expand Up @@ -667,8 +689,14 @@ func (e *Epoch) maybeCollectNotarization() error {
votesForCurrentRound := e.rounds[e.round].votes
voteCount := len(votesForCurrentRound)

from := make([]NodeID, 0, voteCount)
for _, vote := range votesForCurrentRound {
from = append(from, vote.Signature.Signer)
}

if voteCount < e.quorumSize {
e.Logger.Verbo("Counting votes", zap.Uint64("round", e.round), zap.Int("votes", voteCount))
e.Logger.Verbo("Counting votes", zap.Uint64("round", e.round),
zap.Int("votes", voteCount), zap.String("from", fmt.Sprintf("%s", from)))
return nil
}

Expand Down Expand Up @@ -725,7 +753,7 @@ func (e *Epoch) persistNotarization(notarization Notarization) error {

e.increaseRound()

return e.doNotarized(notarization.Vote.Round)
return errors.Join(e.doNotarized(notarization.Vote.Round), e.maybeLoadFutureMessages(e.round))
}

func (e *Epoch) handleNotarizationMessage(message *Notarization, from NodeID) error {
Expand Down Expand Up @@ -902,6 +930,13 @@ func (e *Epoch) createBlockVerificationTask(block Block, md BlockHeader, from No
e.lock.Lock()
defer e.lock.Unlock()

e.Logger.Debug("Block verification started", zap.Uint64("round", md.Round))
start := time.Now()
defer func() {
elapsed := time.Since(start)
e.Logger.Debug("Block verification ended", zap.Uint64("round", md.Round), zap.Duration("elapsed", elapsed))
}()

if err := block.Verify(); err != nil {
e.Logger.Debug("Failed verifying block", zap.Error(err))
return
Expand Down Expand Up @@ -1123,7 +1158,7 @@ func (e *Epoch) proposeBlock(block Block) error {
zap.Int("size", len(rawBlock)),
zap.Stringer("digest", md.Digest))

return e.handleVoteMessage(&vote, e.ID)
return errors.Join(e.handleVoteMessage(&vote, e.ID), e.maybeLoadFutureMessages(md.Round))
}

func (e *Epoch) Metadata() ProtocolMetadata {
Expand Down Expand Up @@ -1196,7 +1231,7 @@ func (e *Epoch) doProposed(block Block, voteFromLeader Vote, from NodeID) error
return err
}

return e.handleVoteMessage(&voteFromLeader, from)
return errors.Join(e.handleVoteMessage(&voteFromLeader, from), e.maybeLoadFutureMessages(md.Round))
}

func (e *Epoch) voteOnBlock(block Block) (Vote, error) {
Expand Down Expand Up @@ -1269,31 +1304,38 @@ func (e *Epoch) storeNotarization(notarization Notarization) error {
func (e *Epoch) maybeLoadFutureMessages(round uint64) error {
e.Logger.Debug("Loading messages received for this round in the past", zap.Uint64("round", round))

for from, messagesFromNode := range e.futureMessages {
if msgs, exists := messagesFromNode[round]; exists {
if msgs.proposal != nil {
if err := e.handleBlockMessage(msgs.proposal, NodeID(from)); err != nil {
return err
for {
round := e.round
height := e.Storage.Height()

for from, messagesFromNode := range e.futureMessages {
if msgs, exists := messagesFromNode[round]; exists {
if msgs.proposal != nil {
if err := e.handleBlockMessage(msgs.proposal, NodeID(from)); err != nil {
return err
}
msgs.proposal = nil
}
msgs.proposal = nil
}
if msgs.vote != nil {
if err := e.handleVoteMessage(msgs.vote, NodeID(from)); err != nil {
return err
if msgs.vote != nil {
if err := e.handleVoteMessage(msgs.vote, NodeID(from)); err != nil {
return err
}
}
msgs.vote = nil
}
if msgs.finalization != nil {
if err := e.handleFinalizationMessage(msgs.finalization, NodeID(from)); err != nil {
return err
if msgs.finalization != nil {
if err := e.handleFinalizationMessage(msgs.finalization, NodeID(from)); err != nil {
return err
}
}
msgs.finalization = nil
}

if msgs.proposal == nil && msgs.vote == nil && msgs.finalization == nil {
delete(messagesFromNode, round)
if msgs.proposal == nil && msgs.vote == nil && msgs.finalization == nil {
delete(messagesFromNode, round)
}
}
}

if e.round == round && height == e.Storage.Height() {
return nil
}
}

return nil
Expand Down
16 changes: 11 additions & 5 deletions epoch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func TestEpochInterleavingMessages(t *testing.T) {
require.NoError(t, err)

callbacks = append(callbacks, func() {
t.Log("Injecting block", block.BlockHeader().Round)
e.HandleMessage(&Message{
BlockMessage: &BlockMessage{
Block: block,
Expand All @@ -158,7 +159,10 @@ func TestEpochInterleavingMessages(t *testing.T) {
}

callbacks = append(callbacks, func() {
e.HandleMessage(&msg, node)
t.Log("Injecting vote for round",
msg.VoteMessage.Vote.Round, msg.VoteMessage.Vote.Digest, msg.VoteMessage.Signature.Signer)
err := e.HandleMessage(&msg, node)
require.NoError(t, err)
})
}

Expand All @@ -170,20 +174,22 @@ func TestEpochInterleavingMessages(t *testing.T) {
Finalization: finalization,
}
callbacks = append(callbacks, func() {
e.HandleMessage(&msg, node)
t.Log("Injecting finalization for round", msg.Finalization.Finalization.Round, msg.Finalization.Finalization.Digest)
err := e.HandleMessage(&msg, node)
require.NoError(t, err)
})
}
}

require.NoError(t, e.Start())

r := rand2.New(rand2.NewSource(time.Now().UnixNano()))
for _, index := range r.Perm(rounds) {
for i, index := range r.Perm(len(callbacks)) {
t.Log("Called callback", i, "out of", len(callbacks))
callbacks[index]()
}

for i := 0; i < rounds; i++ {
fmt.Println(">>>> waiting for block commit of block", i)
storage.waitForBlockCommit(uint64(i))
}
}
Expand All @@ -194,7 +200,7 @@ func TestEpochBlockSentTwice(t *testing.T) {
var tooFarMsg, alreadyReceivedMsg bool

l.intercept(func(entry zapcore.Entry) error {
if entry.Message == "Got block from round too far in the future" {
if entry.Message == "Got block of a future round" {
tooFarMsg = true
}

Expand Down

0 comments on commit a52af73

Please sign in to comment.