From 30ee83d11def3da0a6af6a1fe8ba3f2b4407e26d Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 14 Oct 2024 22:08:38 -0300 Subject: [PATCH 01/12] eventhandler: allow creating removed operator --- eth/eventhandler/event_handler.go | 20 ++++++++++++-------- eth/eventhandler/handlers.go | 21 ++++++++++++++++++--- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/eth/eventhandler/event_handler.go b/eth/eventhandler/event_handler.go index 547e3e04cf..d65ae36b8f 100644 --- a/eth/eventhandler/event_handler.go +++ b/eth/eventhandler/event_handler.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math/big" + "sync" "time" "github.com/ssvlabs/ssv/ekm" @@ -57,14 +58,16 @@ type taskExecutor interface { } type EventHandler struct { - nodeStorage nodestorage.Storage - taskExecutor taskExecutor - eventParser eventparser.Parser - networkConfig networkconfig.NetworkConfig - operatorDataStore operatordatastore.OperatorDataStore - operatorDecrypter keys.OperatorDecrypter - keyManager ekm.KeyManager - beacon beaconprotocol.BeaconNode + nodeStorage nodestorage.Storage + taskExecutor taskExecutor + eventParser eventparser.Parser + networkConfig networkconfig.NetworkConfig + operatorDataStore operatordatastore.OperatorDataStore + operatorDecrypter keys.OperatorDecrypter + keyManager ekm.KeyManager + beacon beaconprotocol.BeaconNode + removedOperators map[string]struct{} + removedOperatorsMu sync.Mutex fullNode bool logger *zap.Logger @@ -91,6 +94,7 @@ func New( operatorDecrypter: operatorDecrypter, keyManager: keyManager, beacon: beacon, + removedOperators: make(map[string]struct{}), logger: zap.NewNop(), metrics: nopMetrics{}, } diff --git a/eth/eventhandler/handlers.go b/eth/eventhandler/handlers.go index 2b4ce2782c..8035f0af06 100644 --- a/eth/eventhandler/handlers.go +++ b/eth/eventhandler/handlers.go @@ -69,7 +69,12 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont if err != nil { return fmt.Errorf("could not get operator data by public key: %w", err) } - if pubkeyExists { + + eh.removedOperatorsMu.Lock() + _, removed := eh.removedOperators[string(od.PublicKey)] + eh.removedOperatorsMu.Unlock() + + if pubkeyExists && !removed { logger.Warn("malformed event: operator public key already exists", fields.OperatorPubKey(operatorData.PublicKey)) return &MalformedEventError{Err: ErrOperatorPubkeyAlreadyExists} @@ -79,7 +84,8 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont if err != nil { return fmt.Errorf("save operator data: %w", err) } - if exists { + + if exists && !removed { logger.Debug("operator data already exists") return nil } @@ -89,6 +95,10 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont logger = logger.With(zap.Bool("own_operator", true)) } + eh.removedOperatorsMu.Lock() + delete(eh.removedOperators, string(od.PublicKey)) + eh.removedOperatorsMu.Unlock() + eh.metrics.OperatorPublicKey(od.ID, od.PublicKey) logger.Debug("processed event") @@ -117,7 +127,12 @@ func (eh *EventHandler) handleOperatorRemoved(txn basedb.Txn, event *contract.Co fields.Owner(od.OwnerAddress), ) - // This function is currently no-op and it will do nothing. Operator removed event is not used in the current implementation. + // This function is currently just maintaining the set of removed validators, and it will do nothing beyond that. + // The set of removed validators allows registering a validator with a pubkey that was once removed. + // Operator removed event is not used for anything else in the current implementation. + eh.removedOperatorsMu.Lock() + eh.removedOperators[string(od.PublicKey)] = struct{}{} + eh.removedOperatorsMu.Unlock() logger.Debug("processed event") return nil From a77c3d3c9a2554a28f5bc0d38856afc3feb0215b Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 17 Oct 2024 00:19:19 -0300 Subject: [PATCH 02/12] delete operator data and liquidate shares on OperatorRemoved event; un-liquidate shares on OperatorAdded event --- eth/eventhandler/event_handler.go | 23 ++++------- eth/eventhandler/handlers.go | 69 ++++++++++++++++++++++++------- 2 files changed, 62 insertions(+), 30 deletions(-) diff --git a/eth/eventhandler/event_handler.go b/eth/eventhandler/event_handler.go index d65ae36b8f..e96ece694a 100644 --- a/eth/eventhandler/event_handler.go +++ b/eth/eventhandler/event_handler.go @@ -6,17 +6,15 @@ import ( "errors" "fmt" "math/big" - "sync" "time" - "github.com/ssvlabs/ssv/ekm" - "github.com/attestantio/go-eth2-client/spec/phase0" ethcommon "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" spectypes "github.com/ssvlabs/ssv-spec/types" "go.uber.org/zap" + "github.com/ssvlabs/ssv/ekm" "github.com/ssvlabs/ssv/eth/contract" "github.com/ssvlabs/ssv/eth/eventparser" "github.com/ssvlabs/ssv/eth/executionclient" @@ -58,16 +56,14 @@ type taskExecutor interface { } type EventHandler struct { - nodeStorage nodestorage.Storage - taskExecutor taskExecutor - eventParser eventparser.Parser - networkConfig networkconfig.NetworkConfig - operatorDataStore operatordatastore.OperatorDataStore - operatorDecrypter keys.OperatorDecrypter - keyManager ekm.KeyManager - beacon beaconprotocol.BeaconNode - removedOperators map[string]struct{} - removedOperatorsMu sync.Mutex + nodeStorage nodestorage.Storage + taskExecutor taskExecutor + eventParser eventparser.Parser + networkConfig networkconfig.NetworkConfig + operatorDataStore operatordatastore.OperatorDataStore + operatorDecrypter keys.OperatorDecrypter + keyManager ekm.KeyManager + beacon beaconprotocol.BeaconNode fullNode bool logger *zap.Logger @@ -94,7 +90,6 @@ func New( operatorDecrypter: operatorDecrypter, keyManager: keyManager, beacon: beacon, - removedOperators: make(map[string]struct{}), logger: zap.NewNop(), metrics: nopMetrics{}, } diff --git a/eth/eventhandler/handlers.go b/eth/eventhandler/handlers.go index 8035f0af06..cd0a46ff08 100644 --- a/eth/eventhandler/handlers.go +++ b/eth/eventhandler/handlers.go @@ -70,11 +70,7 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont return fmt.Errorf("could not get operator data by public key: %w", err) } - eh.removedOperatorsMu.Lock() - _, removed := eh.removedOperators[string(od.PublicKey)] - eh.removedOperatorsMu.Unlock() - - if pubkeyExists && !removed { + if pubkeyExists { logger.Warn("malformed event: operator public key already exists", fields.OperatorPubKey(operatorData.PublicKey)) return &MalformedEventError{Err: ErrOperatorPubkeyAlreadyExists} @@ -85,20 +81,51 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont return fmt.Errorf("save operator data: %w", err) } - if exists && !removed { + if exists { logger.Debug("operator data already exists") return nil } + var modifiedShares []*ssvtypes.SSVShare + for _, share := range eh.nodeStorage.Shares().List(txn, registrystorage.ByOperatorID(event.OperatorId)) { + if !share.Liquidated { + continue + } + + var missingOtherOperators bool + for _, shareMember := range share.Committee { + if shareMember.Signer == event.OperatorId { + continue + } + + _, ok, err := eh.nodeStorage.GetOperatorData(txn, shareMember.Signer) + if err != nil { + return fmt.Errorf("get operator data: %w", err) + } + + if !ok { + missingOtherOperators = true + break + } + } + + if !missingOtherOperators { + share.Liquidated = false + modifiedShares = append(modifiedShares, share) + } + } + + if len(modifiedShares) > 0 { + if err := eh.nodeStorage.Shares().Save(txn, modifiedShares...); err != nil { + return fmt.Errorf("save shares: %w", err) + } + } + if bytes.Equal(event.PublicKey, eh.operatorDataStore.GetOperatorData().PublicKey) { eh.operatorDataStore.SetOperatorData(od) logger = logger.With(zap.Bool("own_operator", true)) } - eh.removedOperatorsMu.Lock() - delete(eh.removedOperators, string(od.PublicKey)) - eh.removedOperatorsMu.Unlock() - eh.metrics.OperatorPublicKey(od.ID, od.PublicKey) logger.Debug("processed event") @@ -127,12 +154,22 @@ func (eh *EventHandler) handleOperatorRemoved(txn basedb.Txn, event *contract.Co fields.Owner(od.OwnerAddress), ) - // This function is currently just maintaining the set of removed validators, and it will do nothing beyond that. - // The set of removed validators allows registering a validator with a pubkey that was once removed. - // Operator removed event is not used for anything else in the current implementation. - eh.removedOperatorsMu.Lock() - eh.removedOperators[string(od.PublicKey)] = struct{}{} - eh.removedOperatorsMu.Unlock() + err = eh.nodeStorage.DeleteOperatorData(txn, od.ID) + if err != nil { + return err + } + + var modifiedShares []*ssvtypes.SSVShare + for _, share := range eh.nodeStorage.Shares().List(txn, registrystorage.ByOperatorID(event.OperatorId)) { + share.Liquidated = true + modifiedShares = append(modifiedShares, share) + } + + if len(modifiedShares) > 0 { + if err := eh.nodeStorage.Shares().Save(txn, modifiedShares...); err != nil { + return fmt.Errorf("save shares: %w", err) + } + } logger.Debug("processed event") return nil From e8660f2fd27bbcb2cc083b022ac7fd31a5d509dd Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 17 Oct 2024 00:28:10 -0300 Subject: [PATCH 03/12] wrap error when deleting operator data --- eth/eventhandler/handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/eventhandler/handlers.go b/eth/eventhandler/handlers.go index cd0a46ff08..c1a8b5a844 100644 --- a/eth/eventhandler/handlers.go +++ b/eth/eventhandler/handlers.go @@ -156,7 +156,7 @@ func (eh *EventHandler) handleOperatorRemoved(txn basedb.Txn, event *contract.Co err = eh.nodeStorage.DeleteOperatorData(txn, od.ID) if err != nil { - return err + return fmt.Errorf("delete operator data: %w", err) } var modifiedShares []*ssvtypes.SSVShare From 724a2ab71979949fdaed0ddef9f2024da1bd4a92 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 17 Oct 2024 09:56:18 -0300 Subject: [PATCH 04/12] invert condition --- eth/eventhandler/handlers.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/eth/eventhandler/handlers.go b/eth/eventhandler/handlers.go index c1a8b5a844..27e02cd2e5 100644 --- a/eth/eventhandler/handlers.go +++ b/eth/eventhandler/handlers.go @@ -92,10 +92,10 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont continue } - var missingOtherOperators bool + var existingOperatorsCount int for _, shareMember := range share.Committee { if shareMember.Signer == event.OperatorId { - continue + existingOperatorsCount++ } _, ok, err := eh.nodeStorage.GetOperatorData(txn, shareMember.Signer) @@ -103,13 +103,12 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont return fmt.Errorf("get operator data: %w", err) } - if !ok { - missingOtherOperators = true - break + if ok { + existingOperatorsCount++ } } - if !missingOtherOperators { + if existingOperatorsCount == len(share.Committee) { share.Liquidated = false modifiedShares = append(modifiedShares, share) } From 56205565e9c2ded956d2b69e1122c31e0de19cb8 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 17 Oct 2024 20:58:26 -0300 Subject: [PATCH 05/12] fix event handler tests --- eth/eventhandler/event_handler_test.go | 225 ++++++++++++------------- eth/eventhandler/handlers.go | 3 +- 2 files changed, 113 insertions(+), 115 deletions(-) diff --git a/eth/eventhandler/event_handler_test.go b/eth/eventhandler/event_handler_test.go index d4b0d2a447..577957c41c 100644 --- a/eth/eventhandler/event_handler_test.go +++ b/eth/eventhandler/event_handler_test.go @@ -256,119 +256,6 @@ func TestHandleBlockEventsStream(t *testing.T) { require.Equal(t, len(ops), len(operators)) }) }) - t.Run("test OperatorRemoved event handle", func(t *testing.T) { - - // Should return MalformedEventError and no changes to the state - t.Run("test OperatorRemoved incorrect operator ID", func(t *testing.T) { - // Call the contract method - _, err = boundContract.SimcontractTransactor.RemoveOperator(auth, 100500) - require.NoError(t, err) - sim.Commit() - - block := <-logs - require.NotEmpty(t, block.Logs) - require.Equal(t, ethcommon.HexToHash("0x0e0ba6c2b04de36d6d509ec5bd155c43a9fe862f8052096dd54f3902a74cca3e"), block.Logs[0].Topics[0]) - - eventsCh := make(chan executionclient.BlockLogs) - go func() { - defer close(eventsCh) - eventsCh <- block - }() - - // Check that there is 1 registered operator - operators, err := eh.nodeStorage.ListOperators(nil, 0, 0) - require.NoError(t, err) - require.Equal(t, len(ops), len(operators)) - - // Handle the event - lastProcessedBlock, err := eh.HandleBlockEventsStream(eventsCh, false) - require.Equal(t, blockNum+1, lastProcessedBlock) - require.NoError(t, err) - blockNum++ - - // Check if the operator wasn't removed successfully - operators, err = eh.nodeStorage.ListOperators(nil, 0, 0) - require.NoError(t, err) - require.Equal(t, len(ops), len(operators)) - }) - - // Receive event, unmarshall, parse, check parse event is not nil or with error, operator id is correct - // TODO: fix this test. It checks nothing, due the handleOperatorRemoved method is no-op currently - t.Run("test OperatorRemoved happy flow", func(t *testing.T) { - // Prepare a new operator to remove it later in this test - op, err := createOperators(1, operatorsCount) - require.NoError(t, err) - operatorsCount++ - - encodedPubKey, err := op[0].privateKey.Public().Base64() - require.NoError(t, err) - - // Call the contract method - packedOperatorPubKey, err := eventparser.PackOperatorPublicKey(encodedPubKey) - require.NoError(t, err) - _, err = boundContract.SimcontractTransactor.RegisterOperator(auth, packedOperatorPubKey, big.NewInt(100_000_000)) - require.NoError(t, err) - - sim.Commit() - - block := <-logs - require.NotEmpty(t, block.Logs) - require.Equal(t, ethcommon.HexToHash("0xd839f31c14bd632f424e307b36abff63ca33684f77f28e35dc13718ef338f7f4"), block.Logs[0].Topics[0]) - - eventsCh := make(chan executionclient.BlockLogs) - go func() { - defer close(eventsCh) - eventsCh <- block - }() - - // Check that there is no registered operators - operators, err := eh.nodeStorage.ListOperators(nil, 0, 0) - require.NoError(t, err) - require.Equal(t, len(ops), len(operators)) - - // Handle OperatorAdded event - lastProcessedBlock, err := eh.HandleBlockEventsStream(eventsCh, false) - require.Equal(t, blockNum+1, lastProcessedBlock) - require.NoError(t, err) - blockNum++ - // Check storage for the new operator - operators, err = eh.nodeStorage.ListOperators(nil, 0, 0) - require.NoError(t, err) - require.Equal(t, len(ops)+1, len(operators)) - - // Now start the OperatorRemoved event handling - // Call the contract method - _, err = boundContract.SimcontractTransactor.RemoveOperator(auth, 4) - require.NoError(t, err) - sim.Commit() - - block = <-logs - require.NotEmpty(t, block.Logs) - require.Equal(t, ethcommon.HexToHash("0x0e0ba6c2b04de36d6d509ec5bd155c43a9fe862f8052096dd54f3902a74cca3e"), block.Logs[0].Topics[0]) - - eventsCh = make(chan executionclient.BlockLogs) - go func() { - defer close(eventsCh) - eventsCh <- block - }() - - operators, err = eh.nodeStorage.ListOperators(nil, 0, 0) - require.NoError(t, err) - require.Equal(t, len(ops)+1, len(operators)) - - // Handle OperatorRemoved event - lastProcessedBlock, err = eh.HandleBlockEventsStream(eventsCh, false) - require.Equal(t, blockNum+1, lastProcessedBlock) - require.NoError(t, err) - blockNum++ - - // TODO: this should be adjusted when eth/eventhandler/handlers.go#L109 is resolved - // Check if the operator was removed successfully - //operators, err = eh.nodeStorage.ListOperators(nil, 0, 0) - //require.NoError(t, err) - //require.Equal(t, len(ops), len(operators)) - }) - }) // Receive event, unmarshall, parse, check parse event is not nil or with an error, // public key is correct, owner is correct, operator ids are correct, shares are correct @@ -1346,6 +1233,118 @@ func TestHandleBlockEventsStream(t *testing.T) { require.False(t, share.Liquidated) }) }) + + t.Run("test OperatorRemoved event handle", func(t *testing.T) { + + // Should return MalformedEventError and no changes to the state + t.Run("test OperatorRemoved incorrect operator ID", func(t *testing.T) { + // Call the contract method + _, err = boundContract.SimcontractTransactor.RemoveOperator(auth, 100500) + require.NoError(t, err) + sim.Commit() + + block := <-logs + require.NotEmpty(t, block.Logs) + require.Equal(t, ethcommon.HexToHash("0x0e0ba6c2b04de36d6d509ec5bd155c43a9fe862f8052096dd54f3902a74cca3e"), block.Logs[0].Topics[0]) + + eventsCh := make(chan executionclient.BlockLogs) + go func() { + defer close(eventsCh) + eventsCh <- block + }() + + // Check that there is 1 registered operator + operators, err := eh.nodeStorage.ListOperators(nil, 0, 0) + require.NoError(t, err) + require.Equal(t, len(ops), len(operators)) + + // Handle the event + lastProcessedBlock, err := eh.HandleBlockEventsStream(eventsCh, false) + require.Equal(t, blockNum+1, lastProcessedBlock) + require.NoError(t, err) + blockNum++ + + // Check if the operator wasn't removed successfully + operators, err = eh.nodeStorage.ListOperators(nil, 0, 0) + require.NoError(t, err) + require.Equal(t, len(ops), len(operators)) + }) + + // Receive event, unmarshall, parse, check parse event is not nil or with error, operator id is correct + t.Run("test OperatorRemoved happy flow", func(t *testing.T) { + // Prepare a new operator to remove it later in this test + op, err := createOperators(1, operatorsCount) + require.NoError(t, err) + operatorsCount++ + + encodedPubKey, err := op[0].privateKey.Public().Base64() + require.NoError(t, err) + + // Call the contract method + packedOperatorPubKey, err := eventparser.PackOperatorPublicKey(encodedPubKey) + require.NoError(t, err) + _, err = boundContract.SimcontractTransactor.RegisterOperator(auth, packedOperatorPubKey, big.NewInt(100_000_000)) + require.NoError(t, err) + + sim.Commit() + + block := <-logs + require.NotEmpty(t, block.Logs) + require.Equal(t, ethcommon.HexToHash("0xd839f31c14bd632f424e307b36abff63ca33684f77f28e35dc13718ef338f7f4"), block.Logs[0].Topics[0]) + + eventsCh := make(chan executionclient.BlockLogs) + go func() { + defer close(eventsCh) + eventsCh <- block + }() + + // Check that there is no registered operators + operators, err := eh.nodeStorage.ListOperators(nil, 0, 0) + require.NoError(t, err) + require.Equal(t, len(ops), len(operators)) + + // Handle OperatorAdded event + lastProcessedBlock, err := eh.HandleBlockEventsStream(eventsCh, false) + require.Equal(t, blockNum+1, lastProcessedBlock) + require.NoError(t, err) + blockNum++ + // Check storage for the new operator + operators, err = eh.nodeStorage.ListOperators(nil, 0, 0) + require.NoError(t, err) + require.Equal(t, len(ops)+1, len(operators)) + + // Now start the OperatorRemoved event handling + // Call the contract method + _, err = boundContract.SimcontractTransactor.RemoveOperator(auth, 4) + require.NoError(t, err) + sim.Commit() + + block = <-logs + require.NotEmpty(t, block.Logs) + require.Equal(t, ethcommon.HexToHash("0x0e0ba6c2b04de36d6d509ec5bd155c43a9fe862f8052096dd54f3902a74cca3e"), block.Logs[0].Topics[0]) + + eventsCh = make(chan executionclient.BlockLogs) + go func() { + defer close(eventsCh) + eventsCh <- block + }() + + operators, err = eh.nodeStorage.ListOperators(nil, 0, 0) + require.NoError(t, err) + require.Equal(t, len(ops)+1, len(operators)) + + // Handle OperatorRemoved event + lastProcessedBlock, err = eh.HandleBlockEventsStream(eventsCh, false) + require.Equal(t, blockNum+1, lastProcessedBlock) + require.NoError(t, err) + blockNum++ + + // Check if the operator was removed successfully + operators, err = eh.nodeStorage.ListOperators(nil, 0, 0) + require.NoError(t, err) + require.Equal(t, len(ops), len(operators)) + }) + }) } func setupEventHandler(t *testing.T, ctx context.Context, logger *zap.Logger, network *networkconfig.NetworkConfig, operator *testOperator, useMockCtrl bool) (*EventHandler, *mocks.MockController, error) { diff --git a/eth/eventhandler/handlers.go b/eth/eventhandler/handlers.go index 27e02cd2e5..07cf4a25bd 100644 --- a/eth/eventhandler/handlers.go +++ b/eth/eventhandler/handlers.go @@ -153,8 +153,7 @@ func (eh *EventHandler) handleOperatorRemoved(txn basedb.Txn, event *contract.Co fields.Owner(od.OwnerAddress), ) - err = eh.nodeStorage.DeleteOperatorData(txn, od.ID) - if err != nil { + if err := eh.nodeStorage.DeleteOperatorData(txn, od.ID); err != nil { return fmt.Errorf("delete operator data: %w", err) } From 444bbc2248d8259dfff54239ddcfc8ad92b45393 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 31 Oct 2024 14:13:12 -0300 Subject: [PATCH 06/12] fix double self calculation logic --- eth/eventhandler/handlers.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/eth/eventhandler/handlers.go b/eth/eventhandler/handlers.go index 07cf4a25bd..84b903a500 100644 --- a/eth/eventhandler/handlers.go +++ b/eth/eventhandler/handlers.go @@ -94,10 +94,6 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont var existingOperatorsCount int for _, shareMember := range share.Committee { - if shareMember.Signer == event.OperatorId { - existingOperatorsCount++ - } - _, ok, err := eh.nodeStorage.GetOperatorData(txn, shareMember.Signer) if err != nil { return fmt.Errorf("get operator data: %w", err) From 0fc0a0afda6424297508e0de032bd34d0fe81ed8 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 31 Oct 2024 14:47:21 -0300 Subject: [PATCH 07/12] Add a comment --- eth/eventhandler/handlers.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/eth/eventhandler/handlers.go b/eth/eventhandler/handlers.go index 84b903a500..30dab5ecc4 100644 --- a/eth/eventhandler/handlers.go +++ b/eth/eventhandler/handlers.go @@ -89,6 +89,9 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont var modifiedShares []*ssvtypes.SSVShare for _, share := range eh.nodeStorage.Shares().List(txn, registrystorage.ByOperatorID(event.OperatorId)) { if !share.Liquidated { + // Skip non-liquidated shares since they are already active. + // A share may become liquidated on OperatorRemoved or ClusterLiquidated events. + // On ValidatorRemoved the share is removed from the storage, so it won't be reactivated on OperatorAdded. continue } From 93b3aa5b13dea40d59906820b5b02e20d89d5220 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 31 Oct 2024 14:48:35 -0300 Subject: [PATCH 08/12] Revert "fix double self calculation logic" This reverts commit 444bbc2248d8259dfff54239ddcfc8ad92b45393. --- eth/eventhandler/handlers.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/eth/eventhandler/handlers.go b/eth/eventhandler/handlers.go index 30dab5ecc4..c5eb9e3426 100644 --- a/eth/eventhandler/handlers.go +++ b/eth/eventhandler/handlers.go @@ -97,6 +97,10 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont var existingOperatorsCount int for _, shareMember := range share.Committee { + if shareMember.Signer == event.OperatorId { + existingOperatorsCount++ + } + _, ok, err := eh.nodeStorage.GetOperatorData(txn, shareMember.Signer) if err != nil { return fmt.Errorf("get operator data: %w", err) From ba53fe12e8ba784cf6eeb568566f3aa2a275b105 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 31 Oct 2024 14:50:29 -0300 Subject: [PATCH 09/12] fix double self calculation differently --- eth/eventhandler/handlers.go | 1 + 1 file changed, 1 insertion(+) diff --git a/eth/eventhandler/handlers.go b/eth/eventhandler/handlers.go index c5eb9e3426..7f765ef7bc 100644 --- a/eth/eventhandler/handlers.go +++ b/eth/eventhandler/handlers.go @@ -99,6 +99,7 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont for _, shareMember := range share.Committee { if shareMember.Signer == event.OperatorId { existingOperatorsCount++ + continue } _, ok, err := eh.nodeStorage.GetOperatorData(txn, shareMember.Signer) From a2060e2d0b91802cf0cb038543a9805e4aa9a4d2 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Sat, 28 Dec 2024 00:59:34 -0300 Subject: [PATCH 10/12] eth/eventhandler: fix tests --- eth/eventhandler/event_handler_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eth/eventhandler/event_handler_test.go b/eth/eventhandler/event_handler_test.go index 26508367a0..4ffc4e6c1e 100644 --- a/eth/eventhandler/event_handler_test.go +++ b/eth/eventhandler/event_handler_test.go @@ -1259,7 +1259,7 @@ func TestHandleBlockEventsStream(t *testing.T) { require.Equal(t, len(ops), len(operators)) // Handle the event - lastProcessedBlock, err := eh.HandleBlockEventsStream(eventsCh, false) + lastProcessedBlock, err := eh.HandleBlockEventsStream(ctx, eventsCh, false) require.Equal(t, blockNum+1, lastProcessedBlock) require.NoError(t, err) blockNum++ @@ -1304,7 +1304,7 @@ func TestHandleBlockEventsStream(t *testing.T) { require.Equal(t, len(ops), len(operators)) // Handle OperatorAdded event - lastProcessedBlock, err := eh.HandleBlockEventsStream(eventsCh, false) + lastProcessedBlock, err := eh.HandleBlockEventsStream(ctx, eventsCh, false) require.Equal(t, blockNum+1, lastProcessedBlock) require.NoError(t, err) blockNum++ @@ -1334,7 +1334,7 @@ func TestHandleBlockEventsStream(t *testing.T) { require.Equal(t, len(ops)+1, len(operators)) // Handle OperatorRemoved event - lastProcessedBlock, err = eh.HandleBlockEventsStream(eventsCh, false) + lastProcessedBlock, err = eh.HandleBlockEventsStream(ctx, eventsCh, false) require.Equal(t, blockNum+1, lastProcessedBlock) require.NoError(t, err) blockNum++ From 6c24b6a44560e0cd603f8a93768c022962e1774e Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Sat, 28 Dec 2024 01:00:04 -0300 Subject: [PATCH 11/12] liquidate share on operator removal if there's no quorum --- eth/eventhandler/handlers.go | 14 ++++++--- .../peers/connections/mock/mock_storage.go | 5 +++ operator/storage/storage.go | 4 +++ registry/storage/operators.go | 31 +++++++++++++++++++ 4 files changed, 50 insertions(+), 4 deletions(-) diff --git a/eth/eventhandler/handlers.go b/eth/eventhandler/handlers.go index 2e1291909b..fd992fb8e6 100644 --- a/eth/eventhandler/handlers.go +++ b/eth/eventhandler/handlers.go @@ -99,7 +99,7 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont continue } - var existingOperatorsCount int + var existingOperatorsCount uint64 for _, shareMember := range share.Committee { if shareMember.Signer == event.OperatorId { existingOperatorsCount++ @@ -116,7 +116,7 @@ func (eh *EventHandler) handleOperatorAdded(txn basedb.Txn, event *contract.Cont } } - if existingOperatorsCount == len(share.Committee) { + if existingOperatorsCount >= share.Quorum() { share.Liquidated = false modifiedShares = append(modifiedShares, share) } @@ -166,8 +166,14 @@ func (eh *EventHandler) handleOperatorRemoved(txn basedb.Txn, event *contract.Co var modifiedShares []*ssvtypes.SSVShare for _, share := range eh.nodeStorage.Shares().List(txn, registrystorage.ByOperatorID(event.OperatorId)) { - share.Liquidated = true - modifiedShares = append(modifiedShares, share) + exists, err := eh.nodeStorage.QuorumExists(txn, share.OperatorIDs(), share.Quorum()) + if err != nil { + return fmt.Errorf("check if operator exists: %w", err) + } + if !exists { + share.Liquidated = true + modifiedShares = append(modifiedShares, share) + } } if len(modifiedShares) > 0 { diff --git a/network/peers/connections/mock/mock_storage.go b/network/peers/connections/mock/mock_storage.go index 93e5493c17..b025ef0103 100644 --- a/network/peers/connections/mock/mock_storage.go +++ b/network/peers/connections/mock/mock_storage.go @@ -76,6 +76,11 @@ func (m NodeStorage) OperatorsExist(r basedb.Reader, ids []spectypes.OperatorID) panic("implement me") } +func (m NodeStorage) QuorumExists(r basedb.Reader, ids []spectypes.OperatorID, quorum uint64) (bool, error) { + //TODO implement me + panic("implement me") +} + func (m NodeStorage) SaveOperatorData(txn basedb.ReadWriter, operatorData *registrystorage.OperatorData) (bool, error) { //TODO implement me panic("implement me") diff --git a/operator/storage/storage.go b/operator/storage/storage.go index 6ac14ba619..8ec61be42d 100644 --- a/operator/storage/storage.go +++ b/operator/storage/storage.go @@ -106,6 +106,10 @@ func (s *storage) OperatorsExist(r basedb.Reader, ids []spectypes.OperatorID) (b return s.operatorStore.OperatorsExist(r, ids) } +func (s *storage) QuorumExists(r basedb.Reader, ids []spectypes.OperatorID, quorum uint64) (bool, error) { + return s.operatorStore.QuorumExists(r, ids, quorum) +} + func (s *storage) SaveOperatorData(rw basedb.ReadWriter, operatorData *registrystorage.OperatorData) (bool, error) { return s.operatorStore.SaveOperatorData(rw, operatorData) } diff --git a/registry/storage/operators.go b/registry/storage/operators.go index 1936acbbd4..3d8636dd7f 100644 --- a/registry/storage/operators.go +++ b/registry/storage/operators.go @@ -34,6 +34,7 @@ type Operators interface { GetOperatorDataByPubKey(r basedb.Reader, operatorPubKey []byte) (*OperatorData, bool, error) GetOperatorData(r basedb.Reader, id spectypes.OperatorID) (*OperatorData, bool, error) OperatorsExist(r basedb.Reader, ids []spectypes.OperatorID) (bool, error) + QuorumExists(r basedb.Reader, ids []spectypes.OperatorID, quorum uint64) (bool, error) SaveOperatorData(rw basedb.ReadWriter, operatorData *OperatorData) (bool, error) DeleteOperatorData(rw basedb.ReadWriter, id spectypes.OperatorID) error ListOperators(r basedb.Reader, from uint64, to uint64) ([]OperatorData, error) @@ -97,6 +98,14 @@ func (s *operatorsStorage) OperatorsExist( return s.operatorsExist(r, ids) } +// QuorumExists returns if existing operators from the list build a quorum +func (s *operatorsStorage) QuorumExists(r basedb.Reader, ids []spectypes.OperatorID, quorum uint64) (bool, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.quorumExists(r, ids, quorum) +} + // GetOperatorDataByPubKey returns data of the given operator by public key func (s *operatorsStorage) GetOperatorDataByPubKey( r basedb.Reader, @@ -162,6 +171,28 @@ func (s *operatorsStorage) operatorsExist( return seen == len(ids), nil } +func (s *operatorsStorage) quorumExists( + r basedb.Reader, + ids []spectypes.OperatorID, + quorum uint64, +) (bool, error) { + var keys [][]byte + for _, id := range ids { + keys = append(keys, buildOperatorKey(id)) + } + + seen := uint64(0) + err := s.db.UsingReader(r).GetMany(s.prefix, keys, func(obj basedb.Obj) error { + seen++ + return nil + }) + if err != nil { + return false, err + } + + return seen > quorum, nil +} + func (s *operatorsStorage) listOperators(r basedb.Reader, from, to uint64) ([]OperatorData, error) { var operators []OperatorData err := s.db.UsingReader(r). From 8b8abeea19a0dcc91e24233cfd1298dc8d4fede0 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Sat, 28 Dec 2024 01:45:11 -0300 Subject: [PATCH 12/12] message/validation: check if operator exists --- cli/operator/node.go | 2 + message/validation/errors.go | 1 + message/validation/validation.go | 13 +++ message/validation/validation_test.go | 153 +++++++++++++------------- network/p2p/test_utils.go | 2 + network/topics/controller_test.go | 9 +- network/topics/msg_validator_test.go | 2 +- 7 files changed, 106 insertions(+), 76 deletions(-) diff --git a/cli/operator/node.go b/cli/operator/node.go index 8c22739922..10cc94b1ec 100644 --- a/cli/operator/node.go +++ b/cli/operator/node.go @@ -20,6 +20,7 @@ import ( "go.uber.org/zap" spectypes "github.com/ssvlabs/ssv-spec/types" + "github.com/ssvlabs/ssv/api/handlers" apiserver "github.com/ssvlabs/ssv/api/server" "github.com/ssvlabs/ssv/beacon/goclient" @@ -234,6 +235,7 @@ var StartNodeCmd = &cobra.Command{ messageValidator := validation.New( networkConfig, nodeStorage.ValidatorStore(), + nodeStorage, dutyStore, signatureVerifier, validation.WithLogger(logger), diff --git a/message/validation/errors.go b/message/validation/errors.go index 0d6aa3037b..4f68f1bf98 100644 --- a/message/validation/errors.go +++ b/message/validation/errors.go @@ -56,6 +56,7 @@ var ( ErrWrongDomain = Error{text: "wrong domain"} ErrNoShareMetadata = Error{text: "share has no metadata"} ErrUnknownValidator = Error{text: "unknown validator"} + ErrUnknownOperator = Error{text: "unknown operator"} // TODO: reject? ErrValidatorLiquidated = Error{text: "validator is liquidated"} ErrValidatorNotAttesting = Error{text: "validator is not attesting"} ErrEarlySlotMessage = Error{text: "message was sent before slot starts"} diff --git a/message/validation/validation.go b/message/validation/validation.go index 47dcafd6f4..b2bfb2fbb8 100644 --- a/message/validation/validation.go +++ b/message/validation/validation.go @@ -37,6 +37,7 @@ type messageValidator struct { consensusStateIndex map[consensusID]*consensusState consensusStateIndexMu sync.Mutex validatorStore storage.ValidatorStore + operatorStore storage.Operators dutyStore *dutystore.Store signatureVerifier signatureverifier.SignatureVerifier // TODO: use spectypes.SignatureVerifier @@ -53,6 +54,7 @@ type messageValidator struct { func New( netCfg networkconfig.NetworkConfig, validatorStore storage.ValidatorStore, + operatorStore storage.Operators, dutyStore *dutystore.Store, signatureVerifier signatureverifier.SignatureVerifier, opts ...Option, @@ -63,6 +65,7 @@ func New( consensusStateIndex: make(map[consensusID]*consensusState), validationLocks: make(map[spectypes.MessageID]*sync.Mutex), validatorStore: validatorStore, + operatorStore: operatorStore, dutyStore: dutyStore, signatureVerifier: signatureVerifier, } @@ -142,6 +145,16 @@ func (mv *messageValidator) handleSignedSSVMessage(signedSSVMessage *spectypes.S return decodedMessage, err } + exist, err := mv.operatorStore.OperatorsExist(nil, signedSSVMessage.OperatorIDs) + if err != nil { + return decodedMessage, fmt.Errorf("check operators existance: %w", err) + } + if !exist { + e := ErrUnknownOperator + e.got = signedSSVMessage.OperatorIDs + return decodedMessage, e + } + validationMu := mv.obtainValidationLock(signedSSVMessage.SSVMessage.GetID()) validationMu.Lock() diff --git a/message/validation/validation_test.go b/message/validation/validation_test.go index 1a483570ae..fb08457e84 100644 --- a/message/validation/validation_test.go +++ b/message/validation/validation_test.go @@ -117,6 +117,11 @@ func Test_ValidateSSVMessage(t *testing.T) { return nil, false }).AnyTimes() + for _, id := range shares.active.OperatorIDs() { + _, err = ns.SaveOperatorData(nil, ®istrystorage.OperatorData{ID: id}) + require.NoError(t, err) + } + signatureVerifier := signatureverifier.NewMockSignatureVerifier(ctrl) signatureVerifier.EXPECT().VerifySignature(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() @@ -132,7 +137,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Message validation happy flow, messages are not ignored or rejected and there are no errors t.Run("happy flow", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) signedSSVMessage := generateSignedMessage(ks, committeeIdentifier, slot) @@ -145,7 +150,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Make sure messages are incremented and throw an ignore message if more than 1 for a commit t.Run("message counts", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) height := specqbft.Height(slot) @@ -223,7 +228,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send a pubsub message with no data should cause an error t.Run("pubsub message has no data", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -237,7 +242,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send a pubsub message where there is too much data should cause an error t.Run("pubsub data too big", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -262,7 +267,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send a malformed pubsub message (empty message) should return an error t.Run("empty pubsub message", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -283,7 +288,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send a message with incorrect data (unable to decode incorrect message type) t.Run("bad data format", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -299,7 +304,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send a message with no data should return an error t.Run("no data", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -318,7 +323,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send a message where there is too much data should cause an error t.Run("data too big", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -339,7 +344,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send exact allowed data size amount but with invalid data (fails to decode) t.Run("data size borderline / malformed message", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -355,7 +360,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send an invalid SSV message type returns an error t.Run("invalid SSV message type", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -369,7 +374,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Generate random validator and validate it is unknown to the network t.Run("unknown validator", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -391,7 +396,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Generate random committee ID and validate it is unknown to the network t.Run("unknown committee ID", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -408,7 +413,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Make sure messages are dropped if on the incorrect network t.Run("wrong domain", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -428,7 +433,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send message with a value that refers to a non-existent role t.Run("invalid role", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -443,7 +448,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Perform validator registration or voluntary exit with a consensus type message will give an error t.Run("unexpected consensus message", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -467,7 +472,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Ignore messages related to a validator that is liquidated t.Run("liquidated validator", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -483,7 +488,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Ignore messages related to a validator with unknown state t.Run("unknown state validator", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -500,7 +505,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Ignore messages related to a validator that in pending queued state t.Run("pending queued state validator", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -518,7 +523,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Don't ignore messages related to a validator that in pending queued state (in case metadata is not updated), // but it is active (activation epoch <= current epoch) t.Run("active validator with pending queued state", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.EstimatedCurrentSlot() @@ -546,7 +551,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Unable to process a message with a validator that is not on the network t.Run("no share metadata", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -561,7 +566,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive error if more than 2 attestation duties in an epoch t.Run("too many duties", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) epoch := phase0.Epoch(1) slot := netCfg.Beacon.FirstSlotAtEpoch(epoch) @@ -616,7 +621,7 @@ func Test_ValidateSSVMessage(t *testing.T) { ds.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{ {Slot: slot, ValidatorIndex: shares.active.ValidatorIndex + 1, Duty: ð2apiv1.ProposerDuty{}, InCommittee: true}, }) - validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, ds, signatureVerifier).(*messageValidator) identifier := spectypes.NewMsgID(netCfg.DomainType, ks.ValidatorPK.Serialize(), spectypes.RoleProposer) signedSSVMessage := generateSignedMessage(ks, identifier, slot) @@ -629,7 +634,7 @@ func Test_ValidateSSVMessage(t *testing.T) { ds.Proposer.Set(epoch, []dutystore.StoreDuty[eth2apiv1.ProposerDuty]{ {Slot: slot, ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.ProposerDuty{}, InCommittee: true}, }) - validator = New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator) + validator = New(netCfg, validatorStore, ns, ds, signatureVerifier).(*messageValidator) _, err = validator.handleSignedSSVMessage(signedSSVMessage, topicID, netCfg.Beacon.GetSlotStartTime(slot)) require.NoError(t, err) }) @@ -645,7 +650,7 @@ func Test_ValidateSSVMessage(t *testing.T) { ds := dutystore.New() - validator := New(mockNetworkConfig, validatorStore, ds, signatureVerifier).(*messageValidator) + validator := New(mockNetworkConfig, validatorStore, ns, ds, signatureVerifier).(*messageValidator) messages := generateRandaoMsg(ks.Shares[1], 1, epoch, currentSlot.GetSlot()) encodedMessages, err := messages.Encode() @@ -681,7 +686,7 @@ func Test_ValidateSSVMessage(t *testing.T) { ds := dutystore.New() ds.Proposer.Set(epoch, make([]dutystore.StoreDuty[eth2apiv1.ProposerDuty], 0)) - validator := New(mockNetworkConfig, validatorStore, ds, signatureVerifier).(*messageValidator) + validator := New(mockNetworkConfig, validatorStore, ns, ds, signatureVerifier).(*messageValidator) messages := generateRandaoMsg(ks.Shares[1], 1, epoch, currentSlot.GetSlot()) encodedMessages, err := messages.Encode() @@ -720,7 +725,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Get error when receiving message from operator who is not affiliated with the validator t.Run("signer ID not in committee", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.EstimatedCurrentSlot() @@ -746,7 +751,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Get error when receiving message from operator who is non-existent (operator id 0) t.Run("partial zero signer ID", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -761,7 +766,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Get error when receiving partial signature message from operator who is the incorrect signer t.Run("partial inconsistent signer ID", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -781,7 +786,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive error when "partialSignatureMessages" does not contain any "partialSignatureMessage" t.Run("no partial signature messages", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -799,7 +804,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive error when the partial RSA signature message is not enough bytes t.Run("partial wrong RSA signature size", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -839,7 +844,7 @@ func Test_ValidateSSVMessage(t *testing.T) { }) ds.VoluntaryExit.AddDuty(spectestingutils.TestingDutySlot, phase0.BLSPubKey(shares.active.ValidatorPubKey)) - validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, ds, signatureVerifier).(*messageValidator) messages := spectestingutils.PostConsensusAggregatorMsg(ks.Shares[1], 1) messages.Type = msgType @@ -872,7 +877,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Get error when receiving a message with an incorrect message type t.Run("invalid message type", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) messages := spectestingutils.PostConsensusAggregatorMsg(ks.Shares[1], 1) messages.Type = math.MaxUint64 @@ -918,7 +923,7 @@ func Test_ValidateSSVMessage(t *testing.T) { {ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.SyncCommitteeDuty{}, InCommittee: true}, }) - validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, ds, signatureVerifier).(*messageValidator) messages := spectestingutils.PostConsensusAggregatorMsg(ks.Shares[1], 1) messages.Type = msgType @@ -951,7 +956,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Get error when receiving QBFT message with an invalid type t.Run("invalid QBFT message type", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) signedSSVMessage := generateSignedMessage(ks, committeeIdentifier, slot, func(message *specqbft.Message) { @@ -967,7 +972,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Get error when receiving an incorrect signature size (too small) t.Run("wrong signature size", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) signedSSVMessage := generateSignedMessage(ks, committeeIdentifier, slot) @@ -981,7 +986,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Get error when receiving a message with an empty list of signers t.Run("no signers", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) signedSSVMessage := generateSignedMessage(ks, committeeIdentifier, slot) @@ -996,7 +1001,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Get error when receiving a message with more signers than committee size. // It tests ErrMoreSignersThanCommitteeSize from knowledge base. t.Run("more signers than committee size", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) signedSSVMessage := generateSignedMessage(ks, committeeIdentifier, slot) @@ -1017,7 +1022,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Get error when receiving a consensus message with zero signer t.Run("consensus zero signer", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) signedSSVMessage := generateSignedMessage(ks, committeeIdentifier, slot) @@ -1031,7 +1036,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Get error when receiving a message with duplicated signers t.Run("non unique signer", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) signedSSVMessage := generateMultiSignedMessage(ks, committeeIdentifier, slot) @@ -1045,7 +1050,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Get error when receiving a message with non-sorted signers t.Run("signers not sorted", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) signedSSVMessage := generateMultiSignedMessage(ks, committeeIdentifier, slot) @@ -1059,7 +1064,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Get error when receiving message with different amount of signers and signatures t.Run("wrong signers/signatures length", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) signedSSVMessage := generateMultiSignedMessage(ks, committeeIdentifier, slot) @@ -1074,7 +1079,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Get error when receiving message from less than quorum size amount of signers t.Run("decided too few signers", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) signedSSVMessage := generateMultiSignedMessage(ks, committeeIdentifier, slot) @@ -1090,7 +1095,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Get error when receiving a non decided message with multiple signers t.Run("non decided with multiple signers", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) signedSSVMessage := generateMultiSignedMessage(ks, committeeIdentifier, slot, func(message *specqbft.Message) { @@ -1119,7 +1124,7 @@ func Test_ValidateSSVMessage(t *testing.T) { {ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.SyncCommitteeDuty{}, InCommittee: true}, }) - validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, ds, signatureVerifier).(*messageValidator) tests := map[spectypes.RunnerRole]time.Time{ spectypes.RoleCommittee: netCfg.Beacon.GetSlotStartTime(slot + 35), @@ -1148,7 +1153,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send early message for all roles before the duty start and receive early message error t.Run("early message", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) signedSSVMessage := generateSignedMessage(ks, committeeIdentifier, slot) @@ -1162,7 +1167,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send message from non-leader acting as a leader should receive an error t.Run("not a leader", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) signedSSVMessage := generateSignedMessage(ks, committeeIdentifier, slot) @@ -1176,7 +1181,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send wrong size of data (8 bytes) for a prepare justification message should receive an error t.Run("malformed prepare justification", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) signedSSVMessage := generateSignedMessage(ks, committeeIdentifier, slot, func(message *specqbft.Message) { @@ -1193,7 +1198,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send prepare justification message without a proposal message should receive an error t.Run("non-proposal with prepare justification", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1216,7 +1221,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send round change justification message without a proposal message should receive an error t.Run("non-proposal with round change justification", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1239,7 +1244,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send round change justification message with a malformed message (1 byte) should receive an error t.Run("malformed round change justification", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1257,7 +1262,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send message root hash that doesn't match the expected root hash should receive an error t.Run("wrong root hash", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1274,7 +1279,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive proposal from same operator twice with different messages (same round) should receive an error t.Run("double proposal with different data", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1299,7 +1304,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive prepare from same operator twice with different messages (same round) should receive an error t.Run("double prepare", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1322,7 +1327,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive commit from same operator twice with different messages (same round) should receive an error t.Run("double commit", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1344,7 +1349,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive round change from same operator twice with different messages (same round) should receive an error t.Run("double round change", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1366,7 +1371,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Decided with same signers should receive an error t.Run("decided with same signers", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1387,7 +1392,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send message with a slot lower than in the previous message t.Run("slot already advanced", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1410,7 +1415,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Send message with a round lower than in the previous message t.Run("round already advanced", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1444,7 +1449,7 @@ func Test_ValidateSSVMessage(t *testing.T) { {ValidatorIndex: shares.active.ValidatorIndex, Duty: ð2apiv1.SyncCommitteeDuty{}, InCommittee: true}, }) - validator := New(netCfg, validatorStore, ds, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, ds, signatureVerifier).(*messageValidator) tests := map[spectypes.RunnerRole]specqbft.Round{ spectypes.RoleCommittee: 13, @@ -1489,7 +1494,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive an event message from an operator that is not myself should receive an error t.Run("event message", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1505,7 +1510,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive a unknown message type from an operator that is not myself should receive an error t.Run("unknown type message", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1522,7 +1527,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive a message with a wrong signature t.Run("wrong signature", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, wrongSignatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, wrongSignatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1537,7 +1542,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive a message with an incorrect topic t.Run("incorrect topic", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1552,7 +1557,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive nil signed ssv message t.Run("nil signed ssv message", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1564,7 +1569,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive nil ssv message t.Run("nil ssv message", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1579,7 +1584,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive zero round t.Run("zero round", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1595,7 +1600,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive a message with no signatures t.Run("no signatures", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1610,7 +1615,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive a message with mismatched identifier t.Run("mismatched identifier", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1628,7 +1633,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive a prepare/commit message with FullData t.Run("prepare/commit with FullData", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1650,7 +1655,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive a non-consensus message with FullData t.Run("non-consensus with FullData", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1667,7 +1672,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive a partial signature message with multiple signers t.Run("partial signature with multiple signers", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1685,7 +1690,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive a partial signature message with too many signers t.Run("partial signature with too many messages", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1713,7 +1718,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive a partial signature message with triple validator index t.Run("partial signature with triple validator index", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) @@ -1741,7 +1746,7 @@ func Test_ValidateSSVMessage(t *testing.T) { // Receive a partial signature message with validator index mismatch t.Run("partial signature with validator index mismatch", func(t *testing.T) { - validator := New(netCfg, validatorStore, dutyStore, signatureVerifier).(*messageValidator) + validator := New(netCfg, validatorStore, ns, dutyStore, signatureVerifier).(*messageValidator) slot := netCfg.Beacon.FirstSlotAtEpoch(1) diff --git a/network/p2p/test_utils.go b/network/p2p/test_utils.go index 3153b96996..4d1a4c310d 100644 --- a/network/p2p/test_utils.go +++ b/network/p2p/test_utils.go @@ -182,6 +182,7 @@ func (ln *LocalNet) NewTestP2pNetwork(ctx context.Context, nodeIndex uint64, key cfg.MessageValidator = validation.New( networkconfig.TestNetwork, nodeStorage.ValidatorStore(), + nodeStorage, dutyStore, signatureVerifier, ) @@ -207,6 +208,7 @@ func (ln *LocalNet) NewTestP2pNetwork(ctx context.Context, nodeIndex uint64, key cfg.MessageValidator = validation.New( networkconfig.TestNetwork, nodeStorage.ValidatorStore(), + nodeStorage, dutyStore, signatureVerifier, validation.WithSelfAccept(selfPeerID, true), diff --git a/network/topics/controller_test.go b/network/topics/controller_test.go index 5d939320bd..993893a8a8 100644 --- a/network/topics/controller_test.go +++ b/network/topics/controller_test.go @@ -30,6 +30,7 @@ import ( "github.com/ssvlabs/ssv/network/discovery" "github.com/ssvlabs/ssv/networkconfig" "github.com/ssvlabs/ssv/operator/duties/dutystore" + "github.com/ssvlabs/ssv/operator/storage" registrystorage "github.com/ssvlabs/ssv/registry/storage" "github.com/ssvlabs/ssv/registry/storage/mocks" "github.com/ssvlabs/ssv/storage/basedb" @@ -82,7 +83,13 @@ func TestTopicManager(t *testing.T) { validatorStore := mocks.NewMockValidatorStore(ctrl) signatureVerifier := signatureverifier.NewMockSignatureVerifier(ctrl) - validator := validation.New(networkconfig.TestNetwork, validatorStore, dutyStore, signatureVerifier) + db, err := kv.NewInMemory(logger, basedb.Options{}) + require.NoError(t, err) + + nodeStorage, err := storage.NewNodeStorage(logger, db) + require.NoError(t, err) + + validator := validation.New(networkconfig.TestNetwork, validatorStore, nodeStorage, dutyStore, signatureVerifier) scoreMap := map[peer.ID]*pubsub.PeerScoreSnapshot{} var scoreMapMu sync.Mutex diff --git a/network/topics/msg_validator_test.go b/network/topics/msg_validator_test.go index 30e48fd7ad..1d89831ef4 100644 --- a/network/topics/msg_validator_test.go +++ b/network/topics/msg_validator_test.go @@ -54,7 +54,7 @@ func TestMsgValidator(t *testing.T) { committeeID := share.CommitteeID() signatureVerifier := signatureverifier.NewSignatureVerifier(ns) - mv := validation.New(networkconfig.TestNetwork, ns.ValidatorStore(), dutystore.New(), signatureVerifier) + mv := validation.New(networkconfig.TestNetwork, ns.ValidatorStore(), ns, dutystore.New(), signatureVerifier) require.NotNil(t, mv) slot := networkconfig.TestNetwork.Beacon.GetBeaconNetwork().EstimatedCurrentSlot()