Skip to content

Commit

Permalink
Create multi-node test
Browse files Browse the repository at this point in the history
Signed-off-by: Yacov Manevich <[email protected]>
  • Loading branch information
yacovm committed Dec 23, 2024
1 parent c67c88c commit ab8199d
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 13 deletions.
3 changes: 2 additions & 1 deletion epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,12 +1044,13 @@ func (e *Epoch) storeProposal(block Block) bool {
// We have already received a block for this round in the past, refuse receiving an alternative block.
// We do this because we may have already voted for a different block.
// Refuse processing the block to not be coerced into voting for a different block.
e.Logger.Warn("Already received block for round", zap.Uint64("round", md.Round))
return false
}

round = NewRound(block)
e.rounds[md.Round] = round
// We might have receied votes and finalizations from future rounds before we received this block.
// We might have received votes and finalizations from future rounds before we received this block.
// So load the messages into our round data structure now that we have created it.
e.maybeLoadFutureMessages(md.Round)

Expand Down
163 changes: 163 additions & 0 deletions epoch_multinode_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package simplex_test

import (
"bytes"
"context"
"github.com/stretchr/testify/require"
. "simplex"
"simplex/wal"
"testing"
)

func TestSimplexMultiNodeSimple(t *testing.T) {
bb := newTestControlledBlockBuilder()

var net inMemNetwork
n1 := newSimplexNode(t, 1, &net, bb)
n2 := newSimplexNode(t, 2, &net, bb)
n3 := newSimplexNode(t, 3, &net, bb)
n4 := newSimplexNode(t, 4, &net, bb)

bb.triggerNewBlock()

instances := []*testInstance{n1, n2, n3, n4}

for _, n := range instances {
n.start()
}

for seq := 0; seq < 100; seq++ {
for _, n := range instances {
n.ledger.waitForBlockCommit(uint64(seq))
bb.triggerNewBlock()
}
}
}

func newSimplexNode(t *testing.T, id uint8, net *inMemNetwork, bb BlockBuilder) *testInstance {
l := makeLogger(t)
storage := newInMemStorage()

nodeID := NodeID{id}

e := &Epoch{
Comm: &testComm{
from: nodeID,
net: net,
},
BlockDigester: blockDigester{},
Logger: l,
ID: nodeID,
Signer: &testSigner{},
WAL: &wal.InMemWAL{},
Verifier: &testVerifier{},
BlockVerifier: &testVerifier{},
Storage: storage,
BlockBuilder: bb,
}

ti := &testInstance{
e: e,
t: t,
ledger: storage,
ingress: make(chan struct {
msg *Message
from NodeID
}, 100)}

net.nodes = append(net.nodes, nodeID)
net.instances = append(net.instances, ti)

return ti
}

type testInstance struct {
ledger *InMemStorage
e *Epoch
ingress chan struct {
msg *Message
from NodeID
}
t *testing.T
}

func (t *testInstance) start() {
require.NoError(t.t, t.e.Start())
go t.run()
}

func (t *testInstance) run() {
for {
select {
case msg := <-t.ingress:
err := t.e.HandleMessage(msg.msg, msg.from)
require.NoError(t.t, err)
if err != nil {
return
}
}
}
}

type testComm struct {
from NodeID
net *inMemNetwork
}

func (c *testComm) ListNodes() []NodeID {
return c.net.nodes
}

func (c *testComm) SendMessage(msg *Message, destination NodeID) {
for _, instance := range c.net.instances {
if bytes.Equal(instance.e.ID, destination) {
instance.ingress <- struct {
msg *Message
from NodeID
}{msg: msg, from: c.from}
return
}
}
}

func (c *testComm) Broadcast(msg *Message) {
for _, instance := range c.net.instances {
// Skip sending the message to yourself
if bytes.Equal(c.from, instance.e.ID) {
continue
}
instance.ingress <- struct {
msg *Message
from NodeID
}{msg: msg, from: c.from}
}
}

type inMemNetwork struct {
nodes []NodeID
instances []*testInstance
}

type testControlledBlockBuilder struct {
control chan struct{}
testBlockBuilder
}

func newTestControlledBlockBuilder() *testControlledBlockBuilder {
return &testControlledBlockBuilder{
control: make(chan struct{}, 1),
testBlockBuilder: make(testBlockBuilder, 1),
}
}

func (t *testControlledBlockBuilder) triggerNewBlock() {
t.control <- struct{}{}
}

func (t *testControlledBlockBuilder) BuildBlock(ctx context.Context, metadata ProtocolMetadata) (Block, bool) {
<-t.control
return t.testBlockBuilder.BuildBlock(ctx, metadata)
}
58 changes: 46 additions & 12 deletions epoch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
. "simplex"
"simplex/wal"
"sync"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -22,7 +23,7 @@ import (
func TestEpochSimpleFlow(t *testing.T) {
l := makeLogger(t)
bb := make(testBlockBuilder, 1)
storage := make(InMemStorage)
storage := newInMemStorage()

e := &Epoch{
BlockDigester: blockDigester{},
Expand Down Expand Up @@ -66,7 +67,7 @@ func TestEpochSimpleFlow(t *testing.T) {
injectFinalization(t, e, block, NodeID{2})
injectFinalization(t, e, block, NodeID{3})

committedData := storage[uint64(i)].Block.Bytes()
committedData := storage.data[uint64(i)].Block.Bytes()
require.Equal(t, block.Bytes(), committedData)
}
}
Expand Down Expand Up @@ -226,34 +227,67 @@ func (b blockDigester) Digest(block Block) []byte {
return digest[:]
}

type InMemStorage map[uint64]struct {
Block
FinalizationCertificate
type InMemStorage struct {
data map[uint64]struct {
Block
FinalizationCertificate
}

lock sync.Mutex
signal sync.Cond
}

func newInMemStorage() *InMemStorage {
s := &InMemStorage{
data: make(map[uint64]struct {
Block
FinalizationCertificate
}),
}

s.signal = *sync.NewCond(&s.lock)

return s
}

func (mem *InMemStorage) waitForBlockCommit(seq uint64) {
mem.lock.Lock()
defer mem.lock.Unlock()

for {
if _, exists := mem.data[seq]; exists {
return
}

mem.signal.Wait()
}
}

func (mem InMemStorage) Height() uint64 {
return uint64(len(mem))
func (mem *InMemStorage) Height() uint64 {
return uint64(len(mem.data))
}

func (mem InMemStorage) Retrieve(seq uint64) (Block, FinalizationCertificate, bool) {
item, ok := mem[seq]
func (mem *InMemStorage) Retrieve(seq uint64) (Block, FinalizationCertificate, bool) {
item, ok := mem.data[seq]
if !ok {
return nil, FinalizationCertificate{}, false
}
return item.Block, item.FinalizationCertificate, true
}

func (mem InMemStorage) Index(seq uint64, block Block, certificate FinalizationCertificate) {
_, ok := mem[seq]
func (mem *InMemStorage) Index(seq uint64, block Block, certificate FinalizationCertificate) {
_, ok := mem.data[seq]
if ok {
panic(fmt.Sprintf("block with seq %d already indexed!", seq))
}
mem[seq] = struct {
mem.data[seq] = struct {
Block
FinalizationCertificate
}{block,
certificate,
}

mem.signal.Signal()
}

type blockDeserializer struct {
Expand Down

0 comments on commit ab8199d

Please sign in to comment.