Skip to content

Commit

Permalink
Notarization absence detection for leader failover
Browse files Browse the repository at this point in the history
Signed-off-by: Yacov Manevich <[email protected]>
  • Loading branch information
yacovm committed Jan 28, 2025
1 parent 6a0721b commit 8504fb9
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 20 deletions.
107 changes: 90 additions & 17 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (

const (
defaultMaxRoundWindow = 10
defaultMaxPendingBlocks = 10
defaultMaxPendingBlocks = 20

DefaultMaxProposalWaitTime = 5 * time.Second
)

type Round struct {
Expand Down Expand Up @@ -59,20 +61,23 @@ type EpochConfig struct {
type Epoch struct {
EpochConfig
// Runtime
sched *scheduler
lock sync.Mutex
lastBlock Block // latest block commited
canReceiveMessages atomic.Bool
finishCtx context.Context
finishFn context.CancelFunc
nodes []NodeID
eligibleNodeIDs map[string]struct{}
quorumSize int
rounds map[uint64]*Round
futureMessages messagesFromNode
round uint64 // The current round we notarize
maxRoundWindow uint64
maxPendingBlocks int
sched *scheduler
lock sync.Mutex
lastBlock Block // latest block commited
canReceiveMessages atomic.Bool
finishCtx context.Context
finishFn context.CancelFunc
nodes []NodeID
eligibleNodeIDs map[string]struct{}
quorumSize int
rounds map[uint64]*Round
futureMessages messagesFromNode
round uint64 // The current round we notarize
maxRoundWindow uint64
maxPendingBlocks int
maxProposalWait time.Duration
monitor *Monitor
cancelWaitForBlockNotarization context.CancelFunc
}

func NewEpoch(conf EpochConfig) (*Epoch, error) {
Expand All @@ -83,8 +88,8 @@ func NewEpoch(conf EpochConfig) (*Epoch, error) {
}

// AdvanceTime hints the engine that the given amount of time has passed.
func (e *Epoch) AdvanceTime(t time.Duration) {

func (e *Epoch) AdvanceTime(t time.Time) {
e.monitor.AdvanceTime(t)
}

// HandleMessage notifies the engine about a reception of a message.
Expand Down Expand Up @@ -124,6 +129,8 @@ func (e *Epoch) HandleMessage(msg *Message, from NodeID) error {

func (e *Epoch) init() error {
e.sched = NewScheduler()
e.monitor = NewMonitor(e.StartTime, e.Logger)
e.cancelWaitForBlockNotarization = func() {}
e.finishCtx, e.finishFn = context.WithCancel(context.Background())
e.nodes = e.Comm.ListNodes()
e.quorumSize = Quorum(len(e.nodes))
Expand Down Expand Up @@ -767,6 +774,9 @@ func (e *Epoch) persistNotarization(notarization Notarization) error {
return err
}

// Notify a block has been notarized, in case we were waiting for it.
e.cancelWaitForBlockNotarization()

notarizationMessage := &Message{Notarization: &notarization}
e.Comm.Broadcast(notarizationMessage)

Expand Down Expand Up @@ -1220,6 +1230,65 @@ func (e *Epoch) metadata() ProtocolMetadata {
return md
}

func (e *Epoch) triggerProposalWaitTimeExpired(round uint64) {
leader := LeaderForRound(e.nodes, round)
e.Logger.Info("Timed out on block agreement", zap.Uint64("round", round), zap.Stringer("leader", leader))
// TODO: Actually start the empty block agreement
}

func (e *Epoch) monitorProgress(round uint64) {
e.Logger.Debug("Monitoring progress", zap.Uint64("round", round))
ctx, cancelContext := context.WithCancel(context.Background())

noop := func() {}

proposalWaitTimeExpired := func() {
e.triggerProposalWaitTimeExpired(round)
}

var cancelled atomic.Bool

blockShouldBeBuiltNotification := func() {
// This invocation blocks until the block builder tells us it's time to build a new block.
e.BlockBuilder.IncomingBlock(ctx)

// While we waited, a block might have been notarized.
// If so, then don't start monitoring for it being notarized.
if cancelled.Load() {
return
}

e.Logger.Info("It is time to build a block", zap.Uint64("round", round))

// Once it's time to build a new block, wait a grace period of 'e.maxProposalWait' time,
// and if the monitor isn't cancelled by then, invoke proposalWaitTimeExpired() above.
stop := e.monitor.WaitUntil(e.maxProposalWait, proposalWaitTimeExpired)

e.lock.Lock()
defer e.lock.Unlock()

// However, if the proposal is notarized before the wait time expires,
// cancel the above wait procedure.
e.cancelWaitForBlockNotarization = func() {
stop()
e.cancelWaitForBlockNotarization = noop
}
}

// Registers a wait operation that:
// (1) Waits for the block builder to tell us it thinks it's time to build a new block.
// (2) Registers a monitor which, if not cancelled earlier, notifies the Epoch about a timeout for this round.
e.monitor.WaitFor(blockShouldBeBuiltNotification)

// If we notarize a block for this round we should cancel the monitor,
// so first stop it and then cancel the context.
e.cancelWaitForBlockNotarization = func() {
cancelled.Store(true)
cancelContext()
e.cancelWaitForBlockNotarization = noop
}
}

func (e *Epoch) startRound() error {
leaderForCurrentRound := LeaderForRound(e.nodes, e.round)

Expand All @@ -1228,6 +1297,10 @@ func (e *Epoch) startRound() error {
return nil
}

// We're not the leader, make sure if a block is not notarized within a timely manner,
// we will agree on an empty block.
e.monitorProgress(e.round)

// If we're not the leader, check if we have received a proposal earlier for this round
msgsForRound, exists := e.futureMessages[string(leaderForCurrentRound)][e.round]
if !exists || msgsForRound.proposal == nil {
Expand Down
67 changes: 64 additions & 3 deletions epoch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"simplex/wal"
"sync"
"testing"
"time"
)

func TestEpochSimpleFlow(t *testing.T) {
Expand Down Expand Up @@ -371,6 +372,62 @@ func TestEpochBlockTooHighRound(t *testing.T) {
})
}

func TestEpochLeaderFailover(t *testing.T) {
var timeoutDetected sync.WaitGroup
timeoutDetected.Add(1)

l := makeLogger(t, 1)
l.intercept(func(entry zapcore.Entry) error {
if entry.Message == `Timed out on block agreement` {
timeoutDetected.Done()
}
return nil
})

bb := &testBlockBuilder{out: make(chan *testBlock, 1), blockShouldBeBuilt: make(chan struct{}, 1)}
storage := newInMemStorage()

nodes := []NodeID{{1}, {2}, {3}, {4}}
quorum := Quorum(len(nodes))

start := time.Now()

conf := EpochConfig{
StartTime: start,
Logger: l,
ID: nodes[0],
Signer: &testSigner{},
WAL: wal.NewMemWAL(t),
Verifier: &testVerifier{},
Storage: storage,
Comm: noopComm(nodes),
BlockBuilder: bb,
SignatureAggregator: &testSignatureAggregator{},
}

e, err := NewEpoch(conf)
require.NoError(t, err)

require.NoError(t, e.Start())

// Run through 3 blocks, to make the block proposals be:
// 1 --> 2 --> 3 --> X (node 4 doesn't propose a block)

// Then, don't do anything and wait for our node
// to start complaining about a block not being notarized

// TODO: in future PRs we will expand this test to also do the actual agreement on the empty block

rounds := uint64(3)

for round := uint64(0); round < rounds; round++ {
notarizeAndFinalizeRound(t, nodes, round, e, bb, quorum, storage)
}
bb.blockShouldBeBuilt <- struct{}{}
e.AdvanceTime(start.Add(DefaultMaxProposalWaitTime + 1))
timeoutDetected.Wait()
}

func makeLogger(t *testing.T, node int) *testLogger {
logger, err := zap.NewDevelopment(zap.AddCallerSkip(1))
require.NoError(t, err)
Expand Down Expand Up @@ -521,8 +578,9 @@ func (n noopComm) Broadcast(msg *Message) {
}

type testBlockBuilder struct {
out chan *testBlock
in chan *testBlock
out chan *testBlock
in chan *testBlock
blockShouldBeBuilt chan struct{}
}

// BuildBlock builds a new testblock and sends it to the BlockBuilder channel
Expand All @@ -543,7 +601,10 @@ func (t *testBlockBuilder) BuildBlock(_ context.Context, metadata ProtocolMetada
}

func (t *testBlockBuilder) IncomingBlock(ctx context.Context) {
panic("should not be invoked")
select {
case <-t.blockShouldBeBuilt:
case <-ctx.Done():
}
}

type testBlock struct {
Expand Down
120 changes: 120 additions & 0 deletions monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package simplex

import (
"context"
"go.uber.org/zap"
"sync/atomic"
"time"
)

type Monitor struct {
logger Logger
close chan struct{}
time atomic.Value
ticks chan time.Time
tasks chan func()
futureTask atomic.Value
}

type futureTask struct {
deadline time.Time
f func()
}

func NewMonitor(startTime time.Time, logger Logger) *Monitor {
m := &Monitor{
logger: logger,
close: make(chan struct{}),
tasks: make(chan func(), 1),
ticks: make(chan time.Time, 1),
}

m.time.Store(startTime)

go m.run()

return m
}

func (m *Monitor) AdvanceTime(t time.Time) {
m.time.Store(t)
select {
case m.ticks <- t:
default:
}
}

func (m *Monitor) tick(now time.Time, taskID uint64) {
ft := m.futureTask.Load()
if ft == nil {
return
}

task := ft.(*futureTask)

if task.f == nil || task.deadline.IsZero() || now.Before(task.deadline) {
return
}

m.logger.Debug("Executing f", zap.Uint64("taskID", taskID), zap.Time("deadline", task.deadline))
task.f()
m.logger.Debug("Executed f with deadline", zap.Uint64("taskID", taskID))

// clean up future task to mark we have already executed it and to release memory
m.futureTask.Store(&futureTask{})
}

func (m *Monitor) run() {
var taskID uint64
for m.shouldRun() {
select {
case tick := <-m.ticks:
m.tick(tick, taskID)
case f := <-m.tasks:
m.logger.Debug("Executing f", zap.Uint64("taskID", taskID))
f()
m.logger.Debug("Task executed", zap.Uint64("taskID", taskID))
}
}
}

func (m *Monitor) shouldRun() bool {
select {
case <-m.close:
return false
default:
return true
}
}

func (m *Monitor) Close() {
select {
case <-m.close:
return
default:
close(m.close)
}
}

func (m *Monitor) WaitFor(f func()) {
select {
case m.tasks <- f:
default:

}
}

func (m *Monitor) WaitUntil(timeout time.Duration, f func()) context.CancelFunc {
t := m.time.Load()
time := t.(time.Time)

m.futureTask.Store(&futureTask{
f: f,
deadline: time.Add(timeout),
})

return nil
}
Loading

0 comments on commit 8504fb9

Please sign in to comment.