From ab8199dd57a722afd13e31e0935ac46f34091a1d Mon Sep 17 00:00:00 2001 From: Yacov Manevich Date: Mon, 23 Dec 2024 16:59:13 +0100 Subject: [PATCH] Create multi-node test Signed-off-by: Yacov Manevich --- epoch.go | 3 +- epoch_multinode_test.go | 163 ++++++++++++++++++++++++++++++++++++++++ epoch_test.go | 58 +++++++++++--- 3 files changed, 211 insertions(+), 13 deletions(-) create mode 100644 epoch_multinode_test.go diff --git a/epoch.go b/epoch.go index 94e84d6..fbc1f2b 100644 --- a/epoch.go +++ b/epoch.go @@ -1044,12 +1044,13 @@ func (e *Epoch) storeProposal(block Block) bool { // We have already received a block for this round in the past, refuse receiving an alternative block. // We do this because we may have already voted for a different block. // Refuse processing the block to not be coerced into voting for a different block. + e.Logger.Warn("Already received block for round", zap.Uint64("round", md.Round)) return false } round = NewRound(block) e.rounds[md.Round] = round - // We might have receied votes and finalizations from future rounds before we received this block. + // We might have received votes and finalizations from future rounds before we received this block. // So load the messages into our round data structure now that we have created it. e.maybeLoadFutureMessages(md.Round) diff --git a/epoch_multinode_test.go b/epoch_multinode_test.go new file mode 100644 index 0000000..e30660c --- /dev/null +++ b/epoch_multinode_test.go @@ -0,0 +1,163 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package simplex_test + +import ( + "bytes" + "context" + "github.com/stretchr/testify/require" + . "simplex" + "simplex/wal" + "testing" +) + +func TestSimplexMultiNodeSimple(t *testing.T) { + bb := newTestControlledBlockBuilder() + + var net inMemNetwork + n1 := newSimplexNode(t, 1, &net, bb) + n2 := newSimplexNode(t, 2, &net, bb) + n3 := newSimplexNode(t, 3, &net, bb) + n4 := newSimplexNode(t, 4, &net, bb) + + bb.triggerNewBlock() + + instances := []*testInstance{n1, n2, n3, n4} + + for _, n := range instances { + n.start() + } + + for seq := 0; seq < 100; seq++ { + for _, n := range instances { + n.ledger.waitForBlockCommit(uint64(seq)) + bb.triggerNewBlock() + } + } +} + +func newSimplexNode(t *testing.T, id uint8, net *inMemNetwork, bb BlockBuilder) *testInstance { + l := makeLogger(t) + storage := newInMemStorage() + + nodeID := NodeID{id} + + e := &Epoch{ + Comm: &testComm{ + from: nodeID, + net: net, + }, + BlockDigester: blockDigester{}, + Logger: l, + ID: nodeID, + Signer: &testSigner{}, + WAL: &wal.InMemWAL{}, + Verifier: &testVerifier{}, + BlockVerifier: &testVerifier{}, + Storage: storage, + BlockBuilder: bb, + } + + ti := &testInstance{ + e: e, + t: t, + ledger: storage, + ingress: make(chan struct { + msg *Message + from NodeID + }, 100)} + + net.nodes = append(net.nodes, nodeID) + net.instances = append(net.instances, ti) + + return ti +} + +type testInstance struct { + ledger *InMemStorage + e *Epoch + ingress chan struct { + msg *Message + from NodeID + } + t *testing.T +} + +func (t *testInstance) start() { + require.NoError(t.t, t.e.Start()) + go t.run() +} + +func (t *testInstance) run() { + for { + select { + case msg := <-t.ingress: + err := t.e.HandleMessage(msg.msg, msg.from) + require.NoError(t.t, err) + if err != nil { + return + } + } + } +} + +type testComm struct { + from NodeID + net *inMemNetwork +} + +func (c *testComm) ListNodes() []NodeID { + return c.net.nodes +} + +func (c *testComm) SendMessage(msg *Message, destination NodeID) { + for _, instance := range c.net.instances { + if bytes.Equal(instance.e.ID, destination) { + instance.ingress <- struct { + msg *Message + from NodeID + }{msg: msg, from: c.from} + return + } + } +} + +func (c *testComm) Broadcast(msg *Message) { + for _, instance := range c.net.instances { + // Skip sending the message to yourself + if bytes.Equal(c.from, instance.e.ID) { + continue + } + instance.ingress <- struct { + msg *Message + from NodeID + }{msg: msg, from: c.from} + } +} + +type inMemNetwork struct { + nodes []NodeID + instances []*testInstance +} + +type testControlledBlockBuilder struct { + control chan struct{} + testBlockBuilder +} + +func newTestControlledBlockBuilder() *testControlledBlockBuilder { + return &testControlledBlockBuilder{ + control: make(chan struct{}, 1), + testBlockBuilder: make(testBlockBuilder, 1), + } +} + +func (t *testControlledBlockBuilder) triggerNewBlock() { + t.control <- struct{}{} +} + +func (t *testControlledBlockBuilder) BuildBlock(ctx context.Context, metadata ProtocolMetadata) (Block, bool) { + <-t.control + return t.testBlockBuilder.BuildBlock(ctx, metadata) +} diff --git a/epoch_test.go b/epoch_test.go index 1720ead..ca41455 100644 --- a/epoch_test.go +++ b/epoch_test.go @@ -12,6 +12,7 @@ import ( "fmt" . "simplex" "simplex/wal" + "sync" "testing" "github.com/stretchr/testify/require" @@ -22,7 +23,7 @@ import ( func TestEpochSimpleFlow(t *testing.T) { l := makeLogger(t) bb := make(testBlockBuilder, 1) - storage := make(InMemStorage) + storage := newInMemStorage() e := &Epoch{ BlockDigester: blockDigester{}, @@ -66,7 +67,7 @@ func TestEpochSimpleFlow(t *testing.T) { injectFinalization(t, e, block, NodeID{2}) injectFinalization(t, e, block, NodeID{3}) - committedData := storage[uint64(i)].Block.Bytes() + committedData := storage.data[uint64(i)].Block.Bytes() require.Equal(t, block.Bytes(), committedData) } } @@ -226,34 +227,67 @@ func (b blockDigester) Digest(block Block) []byte { return digest[:] } -type InMemStorage map[uint64]struct { - Block - FinalizationCertificate +type InMemStorage struct { + data map[uint64]struct { + Block + FinalizationCertificate + } + + lock sync.Mutex + signal sync.Cond +} + +func newInMemStorage() *InMemStorage { + s := &InMemStorage{ + data: make(map[uint64]struct { + Block + FinalizationCertificate + }), + } + + s.signal = *sync.NewCond(&s.lock) + + return s +} + +func (mem *InMemStorage) waitForBlockCommit(seq uint64) { + mem.lock.Lock() + defer mem.lock.Unlock() + + for { + if _, exists := mem.data[seq]; exists { + return + } + + mem.signal.Wait() + } } -func (mem InMemStorage) Height() uint64 { - return uint64(len(mem)) +func (mem *InMemStorage) Height() uint64 { + return uint64(len(mem.data)) } -func (mem InMemStorage) Retrieve(seq uint64) (Block, FinalizationCertificate, bool) { - item, ok := mem[seq] +func (mem *InMemStorage) Retrieve(seq uint64) (Block, FinalizationCertificate, bool) { + item, ok := mem.data[seq] if !ok { return nil, FinalizationCertificate{}, false } return item.Block, item.FinalizationCertificate, true } -func (mem InMemStorage) Index(seq uint64, block Block, certificate FinalizationCertificate) { - _, ok := mem[seq] +func (mem *InMemStorage) Index(seq uint64, block Block, certificate FinalizationCertificate) { + _, ok := mem.data[seq] if ok { panic(fmt.Sprintf("block with seq %d already indexed!", seq)) } - mem[seq] = struct { + mem.data[seq] = struct { Block FinalizationCertificate }{block, certificate, } + + mem.signal.Signal() } type blockDeserializer struct {