From a9596807fd545494f35089a2c8fd61cd1e357830 Mon Sep 17 00:00:00 2001 From: olegshmuelov <45327364+olegshmuelov@users.noreply.github.com> Date: Tue, 17 Dec 2024 17:29:26 +0200 Subject: [PATCH 01/12] draft --- registry/storage/shares_test.go | 542 ++++++++++++++++-------- registry/storage/validatorstore.go | 210 ++++++--- registry/storage/validatorstore_test.go | 251 ++++++----- 3 files changed, 678 insertions(+), 325 deletions(-) diff --git a/registry/storage/shares_test.go b/registry/storage/shares_test.go index fde107b290..8e2d365cae 100644 --- a/registry/storage/shares_test.go +++ b/registry/storage/shares_test.go @@ -2,11 +2,17 @@ package storage import ( "bytes" + "context" "encoding/hex" + "encoding/json" "fmt" + "slices" "sort" "strconv" + "strings" + "sync" "testing" + "time" eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec/phase0" @@ -15,6 +21,7 @@ import ( spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/stretchr/testify/require" "go.uber.org/zap" + "golang.org/x/exp/maps" "github.com/ssvlabs/ssv/logging" "github.com/ssvlabs/ssv/networkconfig" @@ -25,9 +32,11 @@ import ( "github.com/ssvlabs/ssv/utils/threshold" ) -func TestValidatorSerializer(t *testing.T) { +func init() { threshold.Init() +} +func TestValidatorSerializer(t *testing.T) { sk := &bls.SecretKey{} sk.SetByCSPRNG() @@ -89,7 +98,7 @@ func TestSharesStorage(t *testing.T) { require.NoError(t, err) } - validatorShare, _ := generateRandomValidatorSpecShare(splitKeys) + validatorShare, _ := generateRandomShare(splitKeys) validatorShare.Metadata = ssvtypes.Metadata{ BeaconMetadata: &beaconprotocol.ValidatorMetadata{ Balance: 1, @@ -102,7 +111,7 @@ func TestSharesStorage(t *testing.T) { } require.NoError(t, storage.Shares.Save(nil, validatorShare)) - validatorShare2, _ := generateRandomValidatorSpecShare(splitKeys) + validatorShare2, _ := generateRandomShare(splitKeys) require.NoError(t, storage.Shares.Save(nil, validatorShare2)) validatorShareByKey, exists := storage.Shares.Get(nil, validatorShare.ValidatorPubKey[:]) @@ -187,9 +196,307 @@ func TestSharesStorage(t *testing.T) { }) } -func generateRandomValidatorStorageShare(splitKeys map[uint64]*bls.SecretKey) (*storageShare, *bls.SecretKey) { - threshold.Init() +func TestShareDeletionHandlesValidatorStoreCorrectly(t *testing.T) { + logger := logging.TestLogger(t) + + // Test share deletion with and without reopening the database. + testWithStorageReopen(t, func(t *testing.T, storage *testStorage, reopen func(t *testing.T)) { + // Generate and save a random validator share + validatorShare := fakeParticipatingShare(1, generateRandomPubKey(), []uint64{1, 2, 3, 4}) + require.NoError(t, storage.Shares.Save(nil, validatorShare)) + reopen(t) + + // Ensure the share is saved correctly + savedShare, exists := storage.Shares.Get(nil, validatorShare.ValidatorPubKey[:]) + require.True(t, exists) + require.NotNil(t, savedShare) + + // Ensure the share is saved correctly in the validatorStore + validatorShareFromStore, exists := storage.ValidatorStore.Validator(validatorShare.ValidatorPubKey[:]) + require.True(t, exists) + require.NotNil(t, validatorShareFromStore) + + // Verify that other internal mappings are updated accordingly + requireValidatorStoreIntegrity(t, storage.ValidatorStore, []*ssvtypes.SSVShare{validatorShare}) + + // Delete the share from storage + require.NoError(t, storage.Shares.Delete(nil, validatorShare.ValidatorPubKey[:])) + reopen(t) + + // Verify that the share is deleted from shareStorage + deletedShare, exists := storage.Shares.Get(nil, validatorShare.ValidatorPubKey[:]) + require.False(t, exists) + require.Nil(t, deletedShare, "Share should be deleted from shareStorage") + + // Verify that the validatorStore reflects the removal correctly + removedShare, exists := storage.ValidatorStore.Validator(validatorShare.ValidatorPubKey[:]) + require.False(t, exists) + require.Nil(t, removedShare, "Share should be removed from validator store after deletion") + + // Further checks on internal data structures + committee, exists := storage.ValidatorStore.Committee(validatorShare.CommitteeID()) + require.False(t, exists) + require.Nil(t, committee, "Committee should be nil after share deletion") + + // Verify that other internal mappings are updated accordingly + byIndex, exists := storage.ValidatorStore.ValidatorByIndex(validatorShare.Metadata.BeaconMetadata.Index) + require.False(t, exists) + require.Nil(t, byIndex) + for _, operator := range validatorShare.Committee { + shares := storage.ValidatorStore.OperatorValidators(operator.Signer) + require.Empty(t, shares, "Data for operator should be nil after share deletion") + } + require.Empty(t, storage.ValidatorStore.OperatorValidators(100)) + require.Empty(t, storage.ValidatorStore.Committees()) + + // Cleanup the share storage for the next test + require.NoError(t, storage.Shares.Drop()) + reopen(t) + validators := storage.Shares.List(nil) + require.EqualValues(t, 0, len(validators), "No validators should be left in storage after drop") + requireValidatorStoreIntegrity(t, storage.ValidatorStore, []*ssvtypes.SSVShare{}) + }) + + t.Run("share_gone_after_db_recreation", func(t *testing.T) { + storage, err := newTestStorage(logger) + require.NoError(t, err) + defer storage.Close() + + validatorShare := fakeParticipatingShare(1, generateRandomPubKey(), []uint64{1, 2, 3, 4}) + require.NoError(t, storage.Shares.Save(nil, validatorShare)) + + requireValidatorStoreIntegrity(t, storage.ValidatorStore, []*ssvtypes.SSVShare{validatorShare}) + + require.NoError(t, storage.Recreate(logger)) + + requireValidatorStoreIntegrity(t, storage.ValidatorStore, []*ssvtypes.SSVShare{}) + }) +} + +func TestValidatorStoreThroughSharesStorage(t *testing.T) { + testWithStorageReopen(t, func(t *testing.T, storage *testStorage, reopen func(t *testing.T)) { + // Generate and save a random validator share + testShare := fakeParticipatingShare(1, generateRandomPubKey(), []uint64{1, 2, 3, 4}) + require.NoError(t, storage.Shares.Save(nil, testShare)) + reopen(t) + + // Try saving nil share/shares + require.Error(t, storage.Shares.Save(nil, nil)) + require.Error(t, storage.Shares.Save(nil, nil, testShare)) + require.Error(t, storage.Shares.Save(nil, testShare, nil)) + + // Ensure the share is saved correctly + savedShare, exists := storage.Shares.Get(nil, testShare.ValidatorPubKey[:]) + require.True(t, exists) + require.NotNil(t, savedShare) + + // Verify that the validatorStore has the share via SharesStorage + storedShare, exists := storage.ValidatorStore.Validator(testShare.ValidatorPubKey[:]) + require.True(t, exists) + require.NotNil(t, storedShare, "Share should be present in validator store after adding to sharesStorage") + requireValidatorStoreIntegrity(t, storage.ValidatorStore, []*ssvtypes.SSVShare{testShare}) + + // Now update the share + updatedMetadata := &beaconprotocol.ValidatorMetadata{ + Balance: 5000, + Status: eth2apiv1.ValidatorStateActiveOngoing, + Index: 3, + ActivationEpoch: 5, + } + + // Update the share with new metadata + require.NoError(t, storage.Shares.UpdateValidatorsMetadata(map[spectypes.ValidatorPK]*beaconprotocol.ValidatorMetadata{ + testShare.ValidatorPubKey: updatedMetadata, + })) + reopen(t) + + // Ensure the updated share is reflected in validatorStore + updatedShare, exists := storage.ValidatorStore.Validator(testShare.ValidatorPubKey[:]) + require.True(t, exists) + require.NotNil(t, updatedShare, "Updated share should be present in validator store") + require.Equal(t, updatedMetadata, updatedShare.BeaconMetadata, "Validator metadata should be updated in validator store") + + // Remove the share via SharesStorage + require.NoError(t, storage.Shares.Delete(nil, testShare.ValidatorPubKey[:])) + reopen(t) + + // Verify that the share is removed from both sharesStorage and validatorStore + deletedShare, exists := storage.Shares.Get(nil, testShare.ValidatorPubKey[:]) + require.False(t, exists) + require.Nil(t, deletedShare, "Share should be deleted from sharesStorage") + + removedShare, exists := storage.ValidatorStore.Validator(testShare.ValidatorPubKey[:]) + require.False(t, exists) + require.Nil(t, removedShare, "Share should be removed from validator store after deletion in sharesStorage") + }) +} + +// Test various edge cases where operators have multiple committees. +func TestShareStorage_MultipleCommittees(t *testing.T) { + testWithStorageReopen(t, func(t *testing.T, storage *testStorage, reopen func(t *testing.T)) { + shares := map[phase0.ValidatorIndex]*ssvtypes.SSVShare{} + saveAndVerify := func(s ...*ssvtypes.SSVShare) { + require.NoError(t, storage.Shares.Save(nil, s...)) + reopen(t) + for _, share := range s { + shares[share.ValidatorIndex] = share + } + requireValidatorStoreIntegrity(t, storage.ValidatorStore, maps.Values(shares)) + } + deleteAndVerify := func(share *ssvtypes.SSVShare) { + require.NoError(t, storage.Shares.Delete(nil, share.ValidatorPubKey[:])) + reopen(t) + delete(shares, share.ValidatorIndex) + requireValidatorStoreIntegrity(t, storage.ValidatorStore, maps.Values(shares)) + } + + share1 := fakeParticipatingShare(1, generateRandomPubKey(), []uint64{1, 2, 3, 4}) + share2 := fakeParticipatingShare(2, generateRandomPubKey(), []uint64{1, 2, 3, 4}) + share3 := fakeParticipatingShare(3, generateRandomPubKey(), []uint64{3, 4, 5, 6}) + share4 := fakeParticipatingShare(4, generateRandomPubKey(), []uint64{9, 10, 11, 12}) + saveAndVerify(share1, share2, share3, share4) + + // Test that an exclusive committee with only 1 validator is removed then re-added + // for operators that also have other committees (edgecase). + deleteAndVerify(share3) + saveAndVerify(share3) + + // Test that a committee with multiple validators is not removed. + deleteAndVerify(share2) + + // Test that a committee with multiple validators is removed when all committee validators are removed. + deleteAndVerify(share1) + + // Test that ValidatorStore is empty after all validators are removed. + deleteAndVerify(share3) + deleteAndVerify(share4) + require.Empty(t, storage.ValidatorStore.Validators()) + require.Empty(t, storage.ValidatorStore.Committees()) + require.Empty(t, storage.ValidatorStore.OperatorValidators(1)) + + // Re-add share2 to test that ValidatorStore is updated correctly. + saveAndVerify(share2) + }) +} + +func TestSharesStorage_HighContentionConcurrency(t *testing.T) { + logger := logging.TestLogger(t) + storage, err := newTestStorage(logger) + require.NoError(t, err) + defer storage.Close() + + share1 := fakeParticipatingShare(1, generateRandomPubKey(), []uint64{1, 2, 3, 4}) + share2 := fakeParticipatingShare(2, generateRandomPubKey(), []uint64{1, 2, 3, 4}) + share3 := fakeParticipatingShare(3, generateRandomPubKey(), []uint64{3, 4, 5, 6}) + share4 := fakeParticipatingShare(4, generateRandomPubKey(), []uint64{9, 10, 11, 12}) + + // High contention test with concurrent read, add, update, and remove + var wg sync.WaitGroup + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + for i := 0; i < 100; i++ { + for _, op := range []string{"add", "update", "remove1", "remove4", "read"} { + wg.Add(1) + go func() { + defer wg.Done() + for ctx.Err() == nil { + switch op { + case "add": + require.NoError(t, storage.Shares.Save(nil, share1, share2, share3, share4)) + case "update": + require.NoError(t, storage.Shares.UpdateValidatorsMetadata(map[spectypes.ValidatorPK]*beaconprotocol.ValidatorMetadata{ + share2.ValidatorPubKey: updatedShare2.BeaconMetadata, + })) + case "remove1": + require.NoError(t, storage.Shares.Delete(nil, share1.ValidatorPubKey[:])) + case "remove4": + require.NoError(t, storage.Shares.Delete(nil, share4.ValidatorPubKey[:])) + case "read": + _, _ = storage.ValidatorStore.Validator(share1.ValidatorPubKey[:]) + _, _ = storage.ValidatorStore.Committee(share1.CommitteeID()) + _ = storage.ValidatorStore.Validators() + _ = storage.ValidatorStore.Committees() + } + } + }() + } + } + wg.Wait() + + t.Run("validate high contention state", func(t *testing.T) { + // Check that the store is consistent and valid after high contention + require.NotPanics(t, func() { + storage.ValidatorStore.Validators() + storage.ValidatorStore.Committees() + storage.ValidatorStore.OperatorValidators(1) + storage.ValidatorStore.OperatorCommittees(1) + }) + + // Verify that share2 and share3 are in the validator store (only share1 and share4 are potentially removed). + share2InStore, exists := storage.ValidatorStore.ValidatorByIndex(share2.ValidatorIndex) + require.True(t, exists) + require.NotNil(t, share2InStore) + requireEqualShare(t, share2, share2InStore) + + share3InStore, exists := storage.ValidatorStore.ValidatorByIndex(share3.ValidatorIndex) + require.True(t, exists) + require.NotNil(t, share3InStore) + requireEqualShare(t, share3, share3InStore) + + // Integrity check. + requireValidatorStoreIntegrity(t, storage.ValidatorStore, storage.Shares.List(nil)) + }) +} + +// Runs the given function as a test with and without storage reopen. +func testWithStorageReopen(t *testing.T, f func(t *testing.T, storage *testStorage, reopen func(t *testing.T))) { + for _, withReopen := range []bool{false, true} { + withReopen := withReopen + t.Run(fmt.Sprintf("withReopen=%t", withReopen), func(t *testing.T) { + logger := logging.TestLogger(t) + storage, err := newTestStorage(logger) + require.NoError(t, err) + defer storage.Close() + + reopen := func(t *testing.T) { + if withReopen { + require.NoError(t, storage.Reopen(logger)) + } + } + f(t, storage, reopen) + }) + } +} + +func requireEqualShare(t *testing.T, expected, actual *ssvtypes.SSVShare, msgAndArgs ...any) { + b1, err := json.Marshal(expected) + require.NoError(t, err) + b2, err := json.Marshal(actual) + require.NoError(t, err) + require.JSONEq(t, string(b1), string(b2), msgAndArgs...) +} + +func requireEqualShares(t *testing.T, expected, actual []*ssvtypes.SSVShare, msgAndArgs ...any) { + require.Equal(t, len(expected), len(actual), msgAndArgs...) + + // Sort shares by validator pubkey. + expectedSorted := make([]*ssvtypes.SSVShare, len(expected)) + copy(expectedSorted, expected) + slices.SortFunc(expectedSorted, func(a, b *ssvtypes.SSVShare) int { + return strings.Compare(string(a.ValidatorPubKey[:]), string(b.ValidatorPubKey[:])) + }) + actualSorted := make([]*ssvtypes.SSVShare, len(actual)) + copy(actualSorted, actual) + slices.SortFunc(actual, func(a, b *ssvtypes.SSVShare) int { + return strings.Compare(string(a.ValidatorPubKey[:]), string(b.ValidatorPubKey[:])) + }) + for i, share := range expectedSorted { + requireEqualShare(t, share, actual[i], msgAndArgs...) + } +} + +func generateRandomValidatorStorageShare(splitKeys map[uint64]*bls.SecretKey) (*storageShare, *bls.SecretKey) { sk1 := bls.SecretKey{} sk1.SetByCSPRNG() @@ -234,9 +541,7 @@ func generateRandomValidatorStorageShare(splitKeys map[uint64]*bls.SecretKey) (* }, &sk1 } -func generateRandomValidatorSpecShare(splitKeys map[uint64]*bls.SecretKey) (*ssvtypes.SSVShare, *bls.SecretKey) { - threshold.Init() - +func generateRandomShare(splitKeys map[uint64]*bls.SecretKey) (*ssvtypes.SSVShare, *bls.SecretKey) { sk1 := bls.SecretKey{} sk1.SetByCSPRNG() @@ -257,6 +562,7 @@ func generateRandomValidatorSpecShare(splitKeys map[uint64]*bls.SecretKey) (*ssv return &ssvtypes.SSVShare{ Share: spectypes.Share{ ValidatorPubKey: spectypes.ValidatorPK(sk1.GetPublicKey().Serialize()), + ValidatorIndex: 3, SharePubKey: sk2.GetPublicKey().Serialize(), Committee: ibftCommittee, DomainType: networkconfig.TestNetwork.DomainType, @@ -276,6 +582,48 @@ func generateRandomValidatorSpecShare(splitKeys map[uint64]*bls.SecretKey) (*ssv }, &sk1 } +func generateRandomPubKey() spectypes.ValidatorPK { + sk := &bls.SecretKey{} + sk.SetByCSPRNG() + return spectypes.ValidatorPK(sk.GetPublicKey().Serialize()) +} + +func fakeParticipatingShare(index phase0.ValidatorIndex, pk spectypes.ValidatorPK, operatorIDs []uint64) *ssvtypes.SSVShare { + committee := make([]*spectypes.ShareMember, len(operatorIDs)) + for i, operatorID := range operatorIDs { + sharePubkey := make(spectypes.ShareValidatorPK, len(pk)) + spk := generateRandomPubKey() + copy(sharePubkey, spk[:]) + + committee[i] = &spectypes.ShareMember{ + Signer: operatorID, + SharePubKey: sharePubkey, + } + } + + return &ssvtypes.SSVShare{ + Share: spectypes.Share{ + ValidatorPubKey: pk, + ValidatorIndex: index, + SharePubKey: committee[0].SharePubKey, + Committee: committee, + DomainType: networkconfig.TestNetwork.DomainType, + FeeRecipientAddress: common.HexToAddress("0xFeedB14D8b2C76FdF808C29818b06b830E8C2c0e"), + Graffiti: bytes.Repeat([]byte{0x01}, 32), + }, + Metadata: ssvtypes.Metadata{ + BeaconMetadata: &beaconprotocol.ValidatorMetadata{ + Balance: 1, + Status: eth2apiv1.ValidatorStateActiveOngoing, + Index: index, + ActivationEpoch: 4, + }, + OwnerAddress: common.HexToAddress("0xFeedB14D8b2C76FdF808C29818b06b830E8C2c0e"), + Liquidated: false, + }, + } +} + func generateMaxPossibleShare() (*storageShare, error) { threshold.Init() @@ -326,176 +674,18 @@ func (t *testStorage) Reopen(logger *zap.Logger) error { return t.open(logger) } -func (t *testStorage) Close() error { - return t.db.Close() -} - -func TestShareDeletionHandlesValidatorStoreCorrectly(t *testing.T) { - logger := logging.TestLogger(t) - storage, err := newTestStorage(logger) - require.NoError(t, err) - defer storage.Close() - - // Initialize threshold and generate keys for test setup - threshold.Init() - const keysCount = 4 - - sk := &bls.SecretKey{} - sk.SetByCSPRNG() - - splitKeys, err := threshold.Create(sk.Serialize(), keysCount-1, keysCount) - require.NoError(t, err) - - // Save operators to the storage - for operatorID := range splitKeys { - _, err = storage.Operators.SaveOperatorData(nil, &OperatorData{ID: operatorID, PublicKey: []byte(strconv.FormatUint(operatorID, 10))}) - require.NoError(t, err) +func (t *testStorage) Recreate(logger *zap.Logger) error { + err := t.Close() + if err != nil { + return err } - - // Test share deletion with and without reopening the database. - for _, withReopen := range []bool{true, false} { - t.Run(fmt.Sprintf("withReopen=%t", withReopen), func(t *testing.T) { - // Generate and save a random validator share - validatorShare, _ := generateRandomValidatorSpecShare(splitKeys) - require.NoError(t, storage.Shares.Save(nil, validatorShare)) - if withReopen { - require.NoError(t, storage.Reopen(logger)) - } - - // Ensure the share is saved correctly - savedShare, exists := storage.Shares.Get(nil, validatorShare.ValidatorPubKey[:]) - require.True(t, exists) - require.NotNil(t, savedShare) - - // Ensure the share is saved correctly in the validatorStore - validatorShareFromStore, exists := storage.ValidatorStore.Validator(validatorShare.ValidatorPubKey[:]) - require.True(t, exists) - require.NotNil(t, validatorShareFromStore) - - // Delete the share from storage - require.NoError(t, storage.Shares.Delete(nil, validatorShare.ValidatorPubKey[:])) - if withReopen { - require.NoError(t, storage.Reopen(logger)) - } - - // Verify that the share is deleted from shareStorage - deletedShare, exists := storage.Shares.Get(nil, validatorShare.ValidatorPubKey[:]) - require.False(t, exists) - require.Nil(t, deletedShare, "Share should be deleted from shareStorage") - - // Verify that the validatorStore reflects the removal correctly - removedShare, exists := storage.ValidatorStore.Validator(validatorShare.ValidatorPubKey[:]) - require.False(t, exists) - require.Nil(t, removedShare, "Share should be removed from validator store after deletion") - - // Further checks on internal data structures - committeeID := validatorShare.CommitteeID() - committee, exists := storage.ValidatorStore.Committee(committeeID) - require.False(t, exists) - require.Nil(t, committee, "Committee should be nil after share deletion") - - // Verify that other internal mappings are updated accordingly - for _, operator := range validatorShare.Committee { - shares := storage.ValidatorStore.OperatorValidators(operator.Signer) - require.Empty(t, shares, "Data for operator should be nil after share deletion") - } - - // Cleanup the share storage for the next test - require.NoError(t, storage.Shares.Drop()) - if withReopen { - require.NoError(t, storage.Reopen(logger)) - } - validators := storage.Shares.List(nil) - require.EqualValues(t, 0, len(validators), "No validators should be left in storage after drop") - }) + t.db, err = kv.NewInMemory(logger, basedb.Options{}) + if err != nil { + return err } + return t.open(logger) } -func TestValidatorStoreThroughSharesStorage(t *testing.T) { - logger := logging.TestLogger(t) - storage, err := newTestStorage(logger) - require.NoError(t, err) - defer storage.Close() - - // Initialize threshold and generate keys for test setup - threshold.Init() - const keysCount = 4 - - sk := &bls.SecretKey{} - sk.SetByCSPRNG() - - splitKeys, err := threshold.Create(sk.Serialize(), keysCount-1, keysCount) - require.NoError(t, err) - - // Save operators to the storage - for operatorID := range splitKeys { - _, err = storage.Operators.SaveOperatorData(nil, &OperatorData{ID: operatorID, PublicKey: []byte(strconv.FormatUint(operatorID, 10))}) - require.NoError(t, err) - } - - for _, withReopen := range []bool{true, false} { - t.Run(fmt.Sprintf("withReopen=%t", withReopen), func(t *testing.T) { - // Generate and save a random validator share - validatorShare, _ := generateRandomValidatorSpecShare(splitKeys) - require.NoError(t, storage.Shares.Save(nil, validatorShare)) - if withReopen { - require.NoError(t, storage.Reopen(logger)) - } - - // Try saving nil share/shares - require.Error(t, storage.Shares.Save(nil, nil)) - require.Error(t, storage.Shares.Save(nil, nil, validatorShare)) - require.Error(t, storage.Shares.Save(nil, validatorShare, nil)) - if withReopen { - require.NoError(t, storage.Reopen(logger)) - } - - // Ensure the share is saved correctly - savedShare, exists := storage.Shares.Get(nil, validatorShare.ValidatorPubKey[:]) - require.True(t, exists) - require.NotNil(t, savedShare) - - // Verify that the validatorStore has the share via SharesStorage - storedShare, exists := storage.ValidatorStore.Validator(validatorShare.ValidatorPubKey[:]) - require.True(t, exists) - require.NotNil(t, storedShare, "Share should be present in validator store after adding to sharesStorage") - - // Now update the share - updatedMetadata := &beaconprotocol.ValidatorMetadata{ - Balance: 5000, - Status: eth2apiv1.ValidatorStateActiveOngoing, - Index: 3, - ActivationEpoch: 5, - } - - // Update the share with new metadata - require.NoError(t, storage.Shares.UpdateValidatorsMetadata(map[spectypes.ValidatorPK]*beaconprotocol.ValidatorMetadata{ - validatorShare.ValidatorPubKey: updatedMetadata, - })) - if withReopen { - require.NoError(t, storage.Reopen(logger)) - } - - // Ensure the updated share is reflected in validatorStore - updatedShare, exists := storage.ValidatorStore.Validator(validatorShare.ValidatorPubKey[:]) - require.True(t, exists) - require.NotNil(t, updatedShare, "Updated share should be present in validator store") - require.Equal(t, updatedMetadata, updatedShare.BeaconMetadata, "Validator metadata should be updated in validator store") - - // Remove the share via SharesStorage - require.NoError(t, storage.Shares.Delete(nil, validatorShare.ValidatorPubKey[:])) - if withReopen { - require.NoError(t, storage.Reopen(logger)) - } - - // Verify that the share is removed from both sharesStorage and validatorStore - deletedShare, exists := storage.Shares.Get(nil, validatorShare.ValidatorPubKey[:]) - require.False(t, exists) - require.Nil(t, deletedShare, "Share should be deleted from sharesStorage") - - removedShare, exists := storage.ValidatorStore.Validator(validatorShare.ValidatorPubKey[:]) - require.False(t, exists) - require.Nil(t, removedShare, "Share should be removed from validator store after deletion in sharesStorage") - }) - } +func (t *testStorage) Close() error { + return t.db.Close() } diff --git a/registry/storage/validatorstore.go b/registry/storage/validatorstore.go index 27f2c59d0e..263f9d7990 100644 --- a/registry/storage/validatorstore.go +++ b/registry/storage/validatorstore.go @@ -1,6 +1,7 @@ package storage import ( + "encoding/hex" "fmt" "slices" "sync" @@ -236,24 +237,39 @@ func (c *validatorStore) handleSharesAdded(shares ...*types.SSVShare) error { } // Update byCommitteeID - committee := c.byCommitteeID[share.CommitteeID()] - if committee == nil { - committee = buildCommittee([]*types.SSVShare{share}) - } else { - committee = buildCommittee(append(committee.Validators, share)) + committeeID := share.CommitteeID() + committee, exists := c.byCommitteeID[committeeID] + + if !exists { + committee = &Committee{ + ID: committeeID, + Operators: []spectypes.OperatorID{}, + Validators: []*types.SSVShare{}, + Indices: []phase0.ValidatorIndex{}, + } + } + + if !containsShare(committee.Validators, share) { + committee.Validators = append(committee.Validators, share) + committee.Indices = append(committee.Indices, share.ValidatorIndex) } + + addNewCommitteeOperators(committee, share.Committee) c.byCommitteeID[committee.ID] = committee // Update byOperatorID for _, operator := range share.Committee { - data := c.byOperatorID[operator.Signer] - if data == nil { + data, exists := c.byOperatorID[operator.Signer] + if !exists { data = &sharesAndCommittees{ - shares: []*types.SSVShare{share}, - committees: []*Committee{committee}, + shares: []*types.SSVShare{}, + committees: []*Committee{}, } - } else { - data.shares = append(data.shares, share) + } + + data.shares = append(data.shares, share) + + if !containsCommittee(data.committees, committee.ID) { data.committees = append(data.committees, committee) } @@ -278,40 +294,44 @@ func (c *validatorStore) handleShareRemoved(share *types.SSVShare) error { } // Update byCommitteeID - committee := c.byCommitteeID[share.CommitteeID()] - if committee != nil { - validators := make([]*types.SSVShare, 0, len(committee.Validators)-1) - indices := make([]phase0.ValidatorIndex, 0, len(committee.Validators)-1) - for _, validator := range committee.Validators { - if validator.ValidatorPubKey != share.ValidatorPubKey { - validators = append(validators, validator) - indices = append(indices, validator.ValidatorIndex) - } - } - if len(validators) == 0 { - delete(c.byCommitteeID, committee.ID) - } else { - committee.Validators = validators - committee.Indices = indices - } + committeeID := share.CommitteeID() + committee, exists := c.byCommitteeID[committeeID] + if !exists { + return fmt.Errorf("committee not found. id=%s", hex.EncodeToString(committeeID[:])) + } + + if !removeShareFromCommittee(committee, share) { + return fmt.Errorf("share not found in committee. validator_pubkey=%s committee_id=%s", + hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committeeID[:])) + } + + committeeRemoved := len(committee.Validators) == 0 + if committeeRemoved { + delete(c.byCommitteeID, committee.ID) } // Update byOperatorID for _, operator := range share.Committee { - data := c.byOperatorID[operator.Signer] - if data != nil { - shares := make([]*types.SSVShare, 0, len(data.shares)-1) - for _, s := range data.shares { - if s.ValidatorPubKey != share.ValidatorPubKey { - shares = append(shares, s) - } - } - if len(shares) == 0 { - delete(c.byOperatorID, operator.Signer) - } else { - data.shares = shares + data, exists := c.byOperatorID[operator.Signer] + if !exists { + return fmt.Errorf("operator not found. operator_id=%d", operator.Signer) + } + + if !removeShareFromOperator(data, share) { + return fmt.Errorf("share not found in operator. validator_pubkey=%s operator_id=%d", + hex.EncodeToString(share.ValidatorPubKey[:]), operator.Signer) + } + + if committeeRemoved { + if !removeCommitteeFromOperator(data, committee.ID) { + return fmt.Errorf("committee not found in operator. committee_id=%s operator_id=%d", + hex.EncodeToString(committeeID[:]), operator.Signer) } } + + if len(data.shares) == 0 { + delete(c.byOperatorID, operator.Signer) + } } return nil @@ -325,34 +345,52 @@ func (c *validatorStore) handleSharesUpdated(shares ...*types.SSVShare) error { if share == nil { return fmt.Errorf("nil share") } + // Update byValidatorIndex if share.HasBeaconMetadata() { c.byValidatorIndex[share.BeaconMetadata.Index] = share } // Update byCommitteeID - committee := c.byCommitteeID[share.CommitteeID()] - if committee != nil { - for i, validator := range committee.Validators { - if validator.ValidatorPubKey == share.ValidatorPubKey { - committee.Validators[i] = share - committee.Indices[i] = share.ValidatorIndex - break - } + committeeID := share.CommitteeID() + committee, exists := c.byCommitteeID[committeeID] + if !exists { + return fmt.Errorf("committee not found. id=%s", hex.EncodeToString(committeeID[:])) + } + + shareUpdated := false + for i, validator := range committee.Validators { + if validator.ValidatorPubKey == share.ValidatorPubKey { + committee.Validators[i] = share + committee.Indices[i] = share.ValidatorIndex + shareUpdated = true + break } } + if !shareUpdated { + return fmt.Errorf("share not found in committee. validator_pubkey=%s committee_id=%s", + hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committeeID[:])) + } // Update byOperatorID for _, shareMember := range share.Committee { - data := c.byOperatorID[shareMember.Signer] - if data != nil { - for i, s := range data.shares { - if s.ValidatorPubKey == share.ValidatorPubKey { - data.shares[i] = share - break - } + data, exists := c.byOperatorID[shareMember.Signer] + if !exists { + return fmt.Errorf("operator not found. operator_id=%d", shareMember.Signer) + } + + shareUpdated = false + for i, s := range data.shares { + if s.ValidatorPubKey == share.ValidatorPubKey { + data.shares[i] = share + shareUpdated = true + break } } + if !shareUpdated { + return fmt.Errorf("share not found in operator. validator_pubkey=%s operator_id=%d", + hex.EncodeToString(share.ValidatorPubKey[:]), shareMember.Signer) + } } } @@ -368,6 +406,70 @@ func (c *validatorStore) handleDrop() { c.byOperatorID = make(map[spectypes.OperatorID]*sharesAndCommittees) } +func containsShare(shares []*types.SSVShare, share *types.SSVShare) bool { + for _, existing := range shares { + if existing.ValidatorPubKey == share.ValidatorPubKey { + return true + } + } + return false +} + +func containsCommittee(committees []*Committee, committeeID spectypes.CommitteeID) bool { + for _, committee := range committees { + if committee.ID == committeeID { + return true + } + } + return false +} + +func addNewCommitteeOperators(committee *Committee, shareMembers []*spectypes.ShareMember) { + seen := make(map[spectypes.OperatorID]struct{}, len(committee.Operators)) + for _, opID := range committee.Operators { + seen[opID] = struct{}{} + } + + for _, member := range shareMembers { + if _, exists := seen[member.Signer]; !exists { + committee.Operators = append(committee.Operators, member.Signer) + seen[member.Signer] = struct{}{} + } + } + slices.Sort(committee.Operators) +} + +func removeShareFromCommittee(committee *Committee, share *types.SSVShare) (found bool) { + for i, validator := range committee.Validators { + if validator.ValidatorPubKey == share.ValidatorPubKey { + committee.Validators = append(committee.Validators[:i], committee.Validators[i+1:]...) + committee.Indices = append(committee.Indices[:i], committee.Indices[i+1:]...) + return true + } + } + return false +} + +func removeShareFromOperator(data *sharesAndCommittees, share *types.SSVShare) (found bool) { + for i, s := range data.shares { + if s.ValidatorPubKey == share.ValidatorPubKey { + data.shares = append(data.shares[:i], data.shares[i+1:]...) + return true + } + } + return false +} + +func removeCommitteeFromOperator(data *sharesAndCommittees, committeeID spectypes.CommitteeID) (found bool) { + for i, committee := range data.committees { + if committee.ID == committeeID { + data.committees = append(data.committees[:i], data.committees[i+1:]...) + return true + } + } + return false +} + func buildCommittee(shares []*types.SSVShare) *Committee { committee := &Committee{ ID: shares[0].CommitteeID(), diff --git a/registry/storage/validatorstore_test.go b/registry/storage/validatorstore_test.go index f151c9dfc1..11536c3c6d 100644 --- a/registry/storage/validatorstore_test.go +++ b/registry/storage/validatorstore_test.go @@ -1,10 +1,13 @@ package storage import ( + cryptorand "crypto/rand" "encoding/binary" "fmt" + "math" "math/rand" "os" + "slices" "sync" "sync/atomic" "testing" @@ -655,7 +658,7 @@ func TestValidatorStore_HandleNilAndEmptyStates(t *testing.T) { ValidatorPubKey: spectypes.ValidatorPK{99, 88, 77}, }, }) - require.NoError(t, err) + require.ErrorContains(t, err, "committee not found") // Ensure store remains unaffected require.Len(t, store.Validators(), 0) require.Len(t, store.Committees(), 0) @@ -755,7 +758,9 @@ func TestValidatorStore_UpdateNonExistingShare(t *testing.T) { t.Run("update non-existing share", func(t *testing.T) { require.NotPanics(t, func() { - require.NoError(t, store.handleSharesUpdated(share1)) // Update without adding + err := store.handleSharesUpdated(share1) + require.Error(t, err) + require.Contains(t, err.Error(), "committee not found") }) require.Len(t, store.Validators(), 0) @@ -791,43 +796,6 @@ func TestValidatorStore_RemoveNonExistingShare(t *testing.T) { }) } -func TestValidatorStore_UpdateNilData(t *testing.T) { - shareMap := map[spectypes.ValidatorPK]*ssvtypes.SSVShare{} - - store := newValidatorStore( - func() []*ssvtypes.SSVShare { return maps.Values(shareMap) }, - func(pubKey []byte) (*ssvtypes.SSVShare, bool) { - share := shareMap[spectypes.ValidatorPK(pubKey)] - if share == nil { - return nil, false - } - return share, true - }, - ) - - // Add a valid share and simulate a nil entry in byOperatorID - shareMap[share1.ValidatorPubKey] = share1 - require.NoError(t, store.handleSharesAdded(share1)) - - // Manually set a nil entry for a signer in byOperatorID - store.mu.Lock() - store.byOperatorID[share1.Committee[0].Signer] = nil - store.mu.Unlock() - - t.Run("update with nil data in byOperatorID", func(t *testing.T) { - require.NotPanics(t, func() { - require.NoError(t, store.handleSharesUpdated(share1)) // Attempt to update share1 - }) - - // Validate that the state remains consistent and does not crash - require.Len(t, store.Validators(), 1) - - s, e := store.Validator(share1.ValidatorPubKey[:]) - require.True(t, e) - require.Equal(t, share1, s) - }) -} - func TestValidatorStore_HandlingDifferentStatuses(t *testing.T) { shareMap := map[spectypes.ValidatorPK]*ssvtypes.SSVShare{} @@ -1012,62 +980,6 @@ func TestValidatorStore_InvalidCommitteeHandling(t *testing.T) { }) } -func TestValidatorStore_HighContentionConcurrency(t *testing.T) { - shareMap := map[spectypes.ValidatorPK]*ssvtypes.SSVShare{} - store := newValidatorStore( - func() []*ssvtypes.SSVShare { return maps.Values(shareMap) }, - func(pubKey []byte) (*ssvtypes.SSVShare, bool) { - share := shareMap[spectypes.ValidatorPK(pubKey)] - if share == nil { - return nil, false - } - return share, true - }, - ) - - shareMap[share1.ValidatorPubKey] = share1 - shareMap[share2.ValidatorPubKey] = share2 - require.NoError(t, store.handleSharesAdded(share1, share2)) - - var wg sync.WaitGroup - wg.Add(100) - - // High contention test with concurrent read, add, update, and remove - for i := 0; i < 25; i++ { - go func() { - defer wg.Done() - require.NoError(t, store.handleSharesAdded(share1, share2)) - }() - go func() { - defer wg.Done() - require.NoError(t, store.handleSharesUpdated(updatedShare2)) - }() - go func() { - defer wg.Done() - require.NoError(t, store.handleSharesAdded(share1)) - }() - go func() { - defer wg.Done() - _, _ = store.Validator(share1.ValidatorPubKey[:]) - _, _ = store.Committee(share1.CommitteeID()) - _ = store.Validators() - _ = store.Committees() - }() - } - - wg.Wait() - - t.Run("validate high contention state", func(t *testing.T) { - // Check that the store is consistent and valid after high contention - require.NotPanics(t, func() { - store.Validators() - store.Committees() - store.OperatorValidators(1) - store.OperatorCommittees(1) - }) - }) -} - func TestValidatorStore_BulkAddUpdate(t *testing.T) { shareMap := map[spectypes.ValidatorPK]*ssvtypes.SSVShare{} @@ -1175,3 +1087,152 @@ func TestValidatorStore_ComprehensiveIndex(t *testing.T) { require.Nil(t, s, "Validator should be nil after removal") }) } + +// requireValidatorStoreIntegrity checks that every function of the ValidatorStore returns the expected results, +// by reconstructing the expected state from the given shares and comparing it to the actual state of the store. +// This may seem like an overkill, but as ValidatorStore's implementation becomes more optimized and complex, +// it's a good way to double-check it with a dumb implementation that never changes. +func requireValidatorStoreIntegrity(t *testing.T, store ValidatorStore, shares []*ssvtypes.SSVShare) { + // Check that there are no false positives. + const nonExistingIndex = phase0.ValidatorIndex(math.MaxUint64 - 1) + const nonExistingOperatorID = spectypes.OperatorID(math.MaxUint64 - 1) + var nonExistingCommitteeID spectypes.CommitteeID + n, err := cryptorand.Read(nonExistingCommitteeID[:]) + require.NoError(t, err) + require.Equal(t, len(nonExistingCommitteeID), n) + + validator, exists := store.ValidatorByIndex(nonExistingIndex) + require.False(t, exists) + require.Nil(t, validator) + + operatorShares := store.OperatorValidators(nonExistingOperatorID) + require.Empty(t, operatorShares) + + committee, exists := store.Committee(nonExistingCommitteeID) + require.False(t, exists) + require.Nil(t, committee) + + // Check Validator(pubkey) and ValidatorByIndex(index) + for _, share := range shares { + byPubKey, exists := store.Validator(share.ValidatorPubKey[:]) + require.True(t, exists, "validator %x not found", share.ValidatorPubKey) + requireEqualShare(t, share, byPubKey) + + byIndex, exists := store.ValidatorByIndex(share.Metadata.BeaconMetadata.Index) + require.True(t, exists, "validator %d not found", share.Metadata.BeaconMetadata.Index) + requireEqualShare(t, share, byIndex) + } + + // Reconstruct hierarchy to check integrity of operators and committees. + byOperator := make(map[spectypes.OperatorID][]*ssvtypes.SSVShare) + byCommittee := make(map[spectypes.CommitteeID][]*ssvtypes.SSVShare) + committeeOperators := make(map[spectypes.CommitteeID]map[spectypes.OperatorID]struct{}) + operatorCommittees := make(map[spectypes.OperatorID]map[spectypes.CommitteeID]struct{}) + for _, share := range shares { + id := share.CommitteeID() + byCommittee[id] = append(byCommittee[id], share) + for _, operator := range share.Committee { + byOperator[operator.Signer] = append(byOperator[operator.Signer], share) + + if committeeOperators[id] == nil { + committeeOperators[id] = make(map[spectypes.OperatorID]struct{}) + } + committeeOperators[id][operator.Signer] = struct{}{} + + if operatorCommittees[operator.Signer] == nil { + operatorCommittees[operator.Signer] = make(map[spectypes.CommitteeID]struct{}) + } + operatorCommittees[operator.Signer][id] = struct{}{} + } + } + + // Check OperatorValidators(operatorID) + for operatorID, shares := range byOperator { + operatorShares := store.OperatorValidators(operatorID) + requireEqualShares(t, shares, operatorShares) + } + + // Check Committees(cmtID) + for cmtID, shares := range byCommittee { + cmt, exists := store.Committee(cmtID) + require.True(t, exists) + requireEqualShares(t, shares, cmt.Validators) + + operatorIDs := maps.Keys(committeeOperators[cmtID]) + slices.Sort(operatorIDs) + require.Equal(t, operatorIDs, cmt.Operators, "committee %s has %d operators, but %d in store", cmtID, len(operatorIDs), len(cmt.Operators)) + } + + // Check Committees() + storeCommittees := store.Committees() + require.Equal(t, len(byCommittee), len(storeCommittees)) + for _, cmt := range storeCommittees { + _, ok := byCommittee[cmt.ID] + require.True(t, ok) + } + + // Check Validators() + storeValidators := store.Validators() + require.Equal(t, len(shares), len(storeValidators)) + for _, share := range shares { + storeIndex := slices.IndexFunc(storeValidators, func(validator *ssvtypes.SSVShare) bool { + return validator.ValidatorPubKey == share.ValidatorPubKey + }) + require.NotEqual(t, -1, storeIndex) + requireEqualShare(t, share, storeValidators[storeIndex]) + } + + // Check OperatorCommittees(operatorID) + for operatorID, committees := range operatorCommittees { + storeOperatorCommittees := store.OperatorCommittees(operatorID) + require.Equal(t, len(committees), len(storeOperatorCommittees), "operator %d has %d committees, but %d in store", operatorID, len(committees), len(storeOperatorCommittees)) + for committee := range committees { + // Find the committee in the store. + storeIndex := slices.IndexFunc(storeOperatorCommittees, func(storeCommittee *Committee) bool { + return storeCommittee.ID == committee + }) + require.NotEqual(t, -1, storeIndex) + + // Compare shares. + storeOperatorCommittee := storeOperatorCommittees[storeIndex] + requireEqualShares(t, byCommittee[committee], storeOperatorCommittee.Validators, "committee %v doesn't have expected shares", storeOperatorCommittee.Operators) + + // Compare operator IDs. + operatorIDs := maps.Keys(committeeOperators[committee]) + slices.Sort(operatorIDs) + require.Equal(t, operatorIDs, storeOperatorCommittee.Operators) + + // Compare indices. + expectedIndices := make([]phase0.ValidatorIndex, len(storeOperatorCommittee.Validators)) + for i, validator := range storeOperatorCommittee.Validators { + expectedIndices[i] = validator.Metadata.BeaconMetadata.Index + } + slices.Sort(expectedIndices) + storeIndices := make([]phase0.ValidatorIndex, len(storeOperatorCommittee.Validators)) + copy(storeIndices, storeOperatorCommittee.Indices) + slices.Sort(storeIndices) + require.Equal(t, expectedIndices, storeIndices) + } + } + + // Check ParticipatingValidators(epoch) + var epoch = phase0.Epoch(math.MaxUint64 - 1) + var participatingValidators []*ssvtypes.SSVShare + var participatingCommittees = make(map[spectypes.CommitteeID]struct{}) + for _, share := range shares { + if share.IsParticipating(epoch) { + participatingValidators = append(participatingValidators, share) + participatingCommittees[share.CommitteeID()] = struct{}{} + } + } + storeParticipatingValidators := store.ParticipatingValidators(epoch) + requireEqualShares(t, participatingValidators, storeParticipatingValidators) + + // Check ParticipatingCommittees(epoch) + storeParticipatingCommittees := store.ParticipatingCommittees(epoch) + require.Equal(t, len(participatingCommittees), len(storeParticipatingCommittees)) + for _, cmt := range storeParticipatingCommittees { + _, ok := participatingCommittees[cmt.ID] + require.True(t, ok) + } +} From ee94c54b7ccddee48a4cf6a74313d00fb108916b Mon Sep 17 00:00:00 2001 From: olegshmuelov <45327364+olegshmuelov@users.noreply.github.com> Date: Wed, 18 Dec 2024 14:14:11 +0200 Subject: [PATCH 02/12] prevent duplicate shares in `byOperator` data.shares --- registry/storage/validatorstore.go | 4 +- registry/storage/validatorstore_test.go | 62 +++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/registry/storage/validatorstore.go b/registry/storage/validatorstore.go index 263f9d7990..87088468b1 100644 --- a/registry/storage/validatorstore.go +++ b/registry/storage/validatorstore.go @@ -267,7 +267,9 @@ func (c *validatorStore) handleSharesAdded(shares ...*types.SSVShare) error { } } - data.shares = append(data.shares, share) + if !containsShare(data.shares, share) { + data.shares = append(data.shares, share) + } if !containsCommittee(data.committees, committee.ID) { data.committees = append(data.committees, committee) diff --git a/registry/storage/validatorstore_test.go b/registry/storage/validatorstore_test.go index 11536c3c6d..95d073379d 100644 --- a/registry/storage/validatorstore_test.go +++ b/registry/storage/validatorstore_test.go @@ -1088,6 +1088,68 @@ func TestValidatorStore_ComprehensiveIndex(t *testing.T) { }) } +func TestValidatorStore_HandleDuplicateSharesAdded(t *testing.T) { + shareMap := map[spectypes.ValidatorPK]*ssvtypes.SSVShare{} + + store := newValidatorStore( + func() []*ssvtypes.SSVShare { return maps.Values(shareMap) }, + func(pubKey []byte) (*ssvtypes.SSVShare, bool) { + share := shareMap[spectypes.ValidatorPK(pubKey)] + if share == nil { + return nil, false + } + return share, true + }, + ) + + // Create a share + duplicateShare := &ssvtypes.SSVShare{ + Share: spectypes.Share{ + ValidatorIndex: phase0.ValidatorIndex(1), + ValidatorPubKey: spectypes.ValidatorPK{1, 2, 3}, + SharePubKey: spectypes.ShareValidatorPK{4, 5, 6}, + Committee: []*spectypes.ShareMember{{Signer: 1}}, + FeeRecipientAddress: [20]byte{10, 20, 30}, + Graffiti: []byte("duplicate_test"), + }, + Metadata: ssvtypes.Metadata{ + BeaconMetadata: &beaconprotocol.ValidatorMetadata{ + Index: phase0.ValidatorIndex(1), + ActivationEpoch: 100, + Status: eth2apiv1.ValidatorStatePendingQueued, + }, + OwnerAddress: common.HexToAddress("0x12345"), + Liquidated: false, + }, + } + + // Add the same share multiple times + require.NoError(t, store.handleSharesAdded(duplicateShare)) + require.NoError(t, store.handleSharesAdded(duplicateShare)) + + t.Run("check no duplicates in data.shares", func(t *testing.T) { + // Validate the internal state for operator ID + data, exists := store.byOperatorID[duplicateShare.Committee[0].Signer] + require.True(t, exists, "operator data should exist") + require.NotNil(t, data, "operator data should not be nil") + + // Ensure no duplicates in shares + require.Len(t, data.shares, 1, "data.shares should not contain duplicate entries") + require.Contains(t, data.shares, duplicateShare, "data.shares should contain the added share") + }) + + t.Run("check no duplicates in committee validators", func(t *testing.T) { + committeeID := duplicateShare.CommitteeID() + committee, exists := store.byCommitteeID[committeeID] + require.True(t, exists, "committee should exist") + require.NotNil(t, committee, "committee should not be nil") + + // Ensure no duplicates in committee validators + require.Len(t, committee.Validators, 1, "committee.Validators should not contain duplicate entries") + require.Contains(t, committee.Validators, duplicateShare, "committee.Validators should contain the added share") + }) +} + // requireValidatorStoreIntegrity checks that every function of the ValidatorStore returns the expected results, // by reconstructing the expected state from the given shares and comparing it to the actual state of the store. // This may seem like an overkill, but as ValidatorStore's implementation becomes more optimized and complex, From 43d8cc3022353f2722cd550b7b593afd3b2ad5b4 Mon Sep 17 00:00:00 2001 From: olegshmuelov <45327364+olegshmuelov@users.noreply.github.com> Date: Wed, 8 Jan 2025 15:17:11 +0200 Subject: [PATCH 03/12] avoid shared state issues --- registry/storage/validatorstore.go | 158 ++++++++++++++++------------- 1 file changed, 85 insertions(+), 73 deletions(-) diff --git a/registry/storage/validatorstore.go b/registry/storage/validatorstore.go index 87088468b1..11a9fc00ee 100644 --- a/registry/storage/validatorstore.go +++ b/registry/storage/validatorstore.go @@ -239,22 +239,20 @@ func (c *validatorStore) handleSharesAdded(shares ...*types.SSVShare) error { // Update byCommitteeID committeeID := share.CommitteeID() committee, exists := c.byCommitteeID[committeeID] - - if !exists { - committee = &Committee{ - ID: committeeID, - Operators: []spectypes.OperatorID{}, - Validators: []*types.SSVShare{}, - Indices: []phase0.ValidatorIndex{}, + if exists { + // Verify share does not already exist in committee. + if containsShare(committee.Validators, share) { + // Corrupt state. + return fmt.Errorf("share already exists in committee. validator_pubkey=%s committee_id=%s", + hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committeeID[:])) } - } - if !containsShare(committee.Validators, share) { - committee.Validators = append(committee.Validators, share) - committee.Indices = append(committee.Indices, share.ValidatorIndex) + // Rebuild committee. + committee = buildCommittee(append(committee.Validators, share)) + } else { + // Build new committee. + committee = buildCommittee([]*types.SSVShare{share}) } - - addNewCommitteeOperators(committee, share.Committee) c.byCommitteeID[committee.ID] = committee // Update byOperatorID @@ -267,13 +265,29 @@ func (c *validatorStore) handleSharesAdded(shares ...*types.SSVShare) error { } } - if !containsShare(data.shares, share) { - data.shares = append(data.shares, share) + if containsShare(data.shares, share) { + // Corrupt state. + return fmt.Errorf("share already exists in operator. validator_pubkey=%s operator_id=%d", + hex.EncodeToString(share.ValidatorPubKey[:]), operator.Signer) + } + data.shares = append(data.shares, share) + + // Update committee at index. + // Replace with a new reference to avoid shared state issues. + newCommittees := append([]*Committee(nil), data.committees...) + updated := false + for i, c := range newCommittees { + if c.ID == committee.ID { + newCommittees[i] = committee + updated = true + break + } } - if !containsCommittee(data.committees, committee.ID) { - data.committees = append(data.committees, committee) + if !updated { + newCommittees = append(newCommittees, committee) } + data.committees = newCommittees c.byOperatorID[operator.Signer] = data } @@ -302,9 +316,8 @@ func (c *validatorStore) handleShareRemoved(share *types.SSVShare) error { return fmt.Errorf("committee not found. id=%s", hex.EncodeToString(committeeID[:])) } - if !removeShareFromCommittee(committee, share) { - return fmt.Errorf("share not found in committee. validator_pubkey=%s committee_id=%s", - hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committeeID[:])) + if err := removeShareFromCommittee(committee, share); err != nil { + return fmt.Errorf("failed to remove share from committee. %w", err) } committeeRemoved := len(committee.Validators) == 0 @@ -324,15 +337,26 @@ func (c *validatorStore) handleShareRemoved(share *types.SSVShare) error { hex.EncodeToString(share.ValidatorPubKey[:]), operator.Signer) } - if committeeRemoved { - if !removeCommitteeFromOperator(data, committee.ID) { - return fmt.Errorf("committee not found in operator. committee_id=%s operator_id=%d", - hex.EncodeToString(committeeID[:]), operator.Signer) + // Copy `committees` to avoid shared state issues. + newCommittees := append([]*Committee(nil), data.committees...) + for i, c := range newCommittees { + if c.ID == committeeID { + if committeeRemoved { + // Remove the committee if it was completely removed + newCommittees = append(newCommittees[:i], newCommittees[i+1:]...) + } else { + // Replace the committee with the updated one + newCommittees[i] = committee + } + break } } + data.committees = newCommittees if len(data.shares) == 0 { delete(c.byOperatorID, operator.Signer) + } else { + c.byOperatorID[operator.Signer] = data } } @@ -360,16 +384,16 @@ func (c *validatorStore) handleSharesUpdated(shares ...*types.SSVShare) error { return fmt.Errorf("committee not found. id=%s", hex.EncodeToString(committeeID[:])) } - shareUpdated := false + updated := false for i, validator := range committee.Validators { if validator.ValidatorPubKey == share.ValidatorPubKey { committee.Validators[i] = share committee.Indices[i] = share.ValidatorIndex - shareUpdated = true + updated = true break } } - if !shareUpdated { + if !updated { return fmt.Errorf("share not found in committee. validator_pubkey=%s committee_id=%s", hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committeeID[:])) } @@ -381,18 +405,29 @@ func (c *validatorStore) handleSharesUpdated(shares ...*types.SSVShare) error { return fmt.Errorf("operator not found. operator_id=%d", shareMember.Signer) } - shareUpdated = false + updated = false for i, s := range data.shares { if s.ValidatorPubKey == share.ValidatorPubKey { data.shares[i] = share - shareUpdated = true + updated = true break } } - if !shareUpdated { + if !updated { return fmt.Errorf("share not found in operator. validator_pubkey=%s operator_id=%d", hex.EncodeToString(share.ValidatorPubKey[:]), shareMember.Signer) } + + // Copy `committees` to avoid shared state issues. + newCommittees := append([]*Committee(nil), data.committees...) + for i, c := range newCommittees { + if c.ID == committeeID { + newCommittees[i] = committee + break + } + } + data.committees = newCommittees + c.byOperatorID[shareMember.Signer] = data } } @@ -417,39 +452,25 @@ func containsShare(shares []*types.SSVShare, share *types.SSVShare) bool { return false } -func containsCommittee(committees []*Committee, committeeID spectypes.CommitteeID) bool { - for _, committee := range committees { - if committee.ID == committeeID { - return true - } - } - return false -} - -func addNewCommitteeOperators(committee *Committee, shareMembers []*spectypes.ShareMember) { - seen := make(map[spectypes.OperatorID]struct{}, len(committee.Operators)) - for _, opID := range committee.Operators { - seen[opID] = struct{}{} - } - - for _, member := range shareMembers { - if _, exists := seen[member.Signer]; !exists { - committee.Operators = append(committee.Operators, member.Signer) - seen[member.Signer] = struct{}{} - } - } - slices.Sort(committee.Operators) -} - -func removeShareFromCommittee(committee *Committee, share *types.SSVShare) (found bool) { +func removeShareFromCommittee(committee *Committee, share *types.SSVShare) error { for i, validator := range committee.Validators { if validator.ValidatorPubKey == share.ValidatorPubKey { + if validator.BeaconMetadata.Index != committee.Indices[i] { + // Corrupt state. + return fmt.Errorf("share index mismatch. validator_pubkey=%s committee_id=%s validator_index=%d committee_index=%d", + hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:]), validator.ValidatorIndex, committee.Indices[i]) + } + + // Remove share. committee.Validators = append(committee.Validators[:i], committee.Validators[i+1:]...) committee.Indices = append(committee.Indices[:i], committee.Indices[i+1:]...) - return true + return nil } } - return false + + // Corrupt state. + return fmt.Errorf("share not found in committee. validator_pubkey=%s committee_id=%s", + hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:])) } func removeShareFromOperator(data *sharesAndCommittees, share *types.SSVShare) (found bool) { @@ -462,17 +483,9 @@ func removeShareFromOperator(data *sharesAndCommittees, share *types.SSVShare) ( return false } -func removeCommitteeFromOperator(data *sharesAndCommittees, committeeID spectypes.CommitteeID) (found bool) { - for i, committee := range data.committees { - if committee.ID == committeeID { - data.committees = append(data.committees[:i], data.committees[i+1:]...) - return true - } - } - return false -} - func buildCommittee(shares []*types.SSVShare) *Committee { + // TODO: verify all shares are of the same committee. + committee := &Committee{ ID: shares[0].CommitteeID(), Operators: make([]spectypes.OperatorID, 0, len(shares)), @@ -480,16 +493,15 @@ func buildCommittee(shares []*types.SSVShare) *Committee { Indices: make([]phase0.ValidatorIndex, 0, len(shares)), } - seenOperators := make(map[spectypes.OperatorID]struct{}) + // Set operator IDs. + for _, member := range shares[0].Committee { + committee.Operators = append(committee.Operators, member.Signer) + } + // Set validator indices. for _, share := range shares { - for _, shareMember := range share.Committee { - seenOperators[shareMember.Signer] = struct{}{} - } committee.Indices = append(committee.Indices, share.ValidatorIndex) } - - committee.Operators = maps.Keys(seenOperators) slices.Sort(committee.Operators) return committee From 1d800c5741fbfdc5ad02052a69a7f91f7d6e0b4e Mon Sep 17 00:00:00 2001 From: olegshmuelov <45327364+olegshmuelov@users.noreply.github.com> Date: Wed, 8 Jan 2025 15:17:36 +0200 Subject: [PATCH 04/12] fix tests to adjusted logic --- registry/storage/shares_test.go | 8 +++--- registry/storage/validatorstore_test.go | 35 +++++++++++++------------ 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/registry/storage/shares_test.go b/registry/storage/shares_test.go index 8e2d365cae..b3750039ed 100644 --- a/registry/storage/shares_test.go +++ b/registry/storage/shares_test.go @@ -479,20 +479,22 @@ func requireEqualShare(t *testing.T, expected, actual *ssvtypes.SSVShare, msgAnd func requireEqualShares(t *testing.T, expected, actual []*ssvtypes.SSVShare, msgAndArgs ...any) { require.Equal(t, len(expected), len(actual), msgAndArgs...) - // Sort shares by validator pubkey. + // Sort shares by validator pubkey for comparison without mutating input slices. expectedSorted := make([]*ssvtypes.SSVShare, len(expected)) copy(expectedSorted, expected) slices.SortFunc(expectedSorted, func(a, b *ssvtypes.SSVShare) int { return strings.Compare(string(a.ValidatorPubKey[:]), string(b.ValidatorPubKey[:])) }) + actualSorted := make([]*ssvtypes.SSVShare, len(actual)) copy(actualSorted, actual) - slices.SortFunc(actual, func(a, b *ssvtypes.SSVShare) int { + slices.SortFunc(actualSorted, func(a, b *ssvtypes.SSVShare) int { return strings.Compare(string(a.ValidatorPubKey[:]), string(b.ValidatorPubKey[:])) }) + // Compare the sorted shares for i, share := range expectedSorted { - requireEqualShare(t, share, actual[i], msgAndArgs...) + requireEqualShare(t, share, actualSorted[i], msgAndArgs...) } } diff --git a/registry/storage/validatorstore_test.go b/registry/storage/validatorstore_test.go index 95d073379d..d27c22c76c 100644 --- a/registry/storage/validatorstore_test.go +++ b/registry/storage/validatorstore_test.go @@ -736,7 +736,9 @@ func TestValidatorStore_AddDuplicateShares(t *testing.T) { require.NoError(t, store.handleSharesAdded(share1)) t.Run("validate store after adding duplicate shares", func(t *testing.T) { - require.NoError(t, store.handleSharesAdded(share1)) // Add duplicate + err := store.handleSharesAdded(share1) + require.Error(t, err) + require.Contains(t, err.Error(), "share already exists in committe") require.Len(t, store.Validators(), 1) require.Contains(t, store.Validators(), share1) }) @@ -921,9 +923,11 @@ func TestValidatorStore_MixedOperations(t *testing.T) { require.NoError(t, store.handleSharesAdded(share1, share2)) // Mixed operations - require.NoError(t, store.handleSharesAdded(share1)) shareMap[share1.ValidatorPubKey] = share1 // Re-add share1 - require.NoError(t, store.handleSharesAdded(share1)) + err := store.handleSharesAdded(share1) + require.Error(t, err) + require.Contains(t, err.Error(), "share already exists in committee") + shareMap[updatedShare2.ValidatorPubKey] = updatedShare2 require.NoError(t, store.handleSharesUpdated(updatedShare2)) // Update share2 @@ -949,9 +953,11 @@ func TestValidatorStore_InvalidCommitteeHandling(t *testing.T) { invalidCommitteeShare := &ssvtypes.SSVShare{ Share: spectypes.Share{ - ValidatorIndex: phase0.ValidatorIndex(10), - ValidatorPubKey: spectypes.ValidatorPK{10, 20, 30}, - SharePubKey: spectypes.ShareValidatorPK{40, 50, 60}, + ValidatorIndex: phase0.ValidatorIndex(10), + ValidatorPubKey: spectypes.ValidatorPK{10, 20, 30}, + SharePubKey: spectypes.ShareValidatorPK{40, 50, 60}, + // Invalid committee with duplicate members. + // This scenario is included for testing purposes only, as the event handler should validate and prevent duplicate members. Committee: []*spectypes.ShareMember{{Signer: 1}, {Signer: 1}}, // Duplicate members FeeRecipientAddress: [20]byte{70, 80, 90}, Graffiti: []byte("invalid_committee"), @@ -968,16 +974,7 @@ func TestValidatorStore_InvalidCommitteeHandling(t *testing.T) { } shareMap[invalidCommitteeShare.ValidatorPubKey] = invalidCommitteeShare - require.NoError(t, store.handleSharesAdded(invalidCommitteeShare)) - - t.Run("check invalid committee handling", func(t *testing.T) { - require.Len(t, store.Validators(), 1) - require.Contains(t, store.Validators(), invalidCommitteeShare) - committee, exists := store.Committee(invalidCommitteeShare.CommitteeID()) - require.True(t, exists) - require.NotNil(t, committee) - require.Len(t, committee.Operators, 1) // Should handle duplicates - }) + require.Error(t, store.handleSharesAdded(invalidCommitteeShare)) } func TestValidatorStore_BulkAddUpdate(t *testing.T) { @@ -1062,6 +1059,7 @@ func TestValidatorStore_ComprehensiveIndex(t *testing.T) { Index: phase0.ValidatorIndex(10), }, } + noMetadataShare.ValidatorIndex = phase0.ValidatorIndex(10) t.Run("update share with metadata", func(t *testing.T) { require.NoError(t, store.handleSharesUpdated(noMetadataShare)) @@ -1125,7 +1123,10 @@ func TestValidatorStore_HandleDuplicateSharesAdded(t *testing.T) { // Add the same share multiple times require.NoError(t, store.handleSharesAdded(duplicateShare)) - require.NoError(t, store.handleSharesAdded(duplicateShare)) + + err := store.handleSharesAdded(duplicateShare) + require.Error(t, err) + require.Contains(t, err.Error(), "share already exists in committee") t.Run("check no duplicates in data.shares", func(t *testing.T) { // Validate the internal state for operator ID From 6d2e975920f72311b20fd14b909eb95cccfc2819 Mon Sep 17 00:00:00 2001 From: olegshmuelov <45327364+olegshmuelov@users.noreply.github.com> Date: Wed, 8 Jan 2025 16:41:33 +0200 Subject: [PATCH 05/12] use share validatorindex --- registry/storage/validatorstore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/registry/storage/validatorstore.go b/registry/storage/validatorstore.go index 11a9fc00ee..41d0473fde 100644 --- a/registry/storage/validatorstore.go +++ b/registry/storage/validatorstore.go @@ -455,7 +455,7 @@ func containsShare(shares []*types.SSVShare, share *types.SSVShare) bool { func removeShareFromCommittee(committee *Committee, share *types.SSVShare) error { for i, validator := range committee.Validators { if validator.ValidatorPubKey == share.ValidatorPubKey { - if validator.BeaconMetadata.Index != committee.Indices[i] { + if validator.ValidatorIndex != committee.Indices[i] { // Corrupt state. return fmt.Errorf("share index mismatch. validator_pubkey=%s committee_id=%s validator_index=%d committee_index=%d", hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:]), validator.ValidatorIndex, committee.Indices[i]) From c73ee43b49e0b2614e8f8503bac8035eb2e6707c Mon Sep 17 00:00:00 2001 From: olegshmuelov <45327364+olegshmuelov@users.noreply.github.com> Date: Wed, 8 Jan 2025 17:54:52 +0200 Subject: [PATCH 06/12] check for duplicate operator in share --- registry/storage/validatorstore.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/registry/storage/validatorstore.go b/registry/storage/validatorstore.go index 41d0473fde..44fca3b350 100644 --- a/registry/storage/validatorstore.go +++ b/registry/storage/validatorstore.go @@ -256,7 +256,14 @@ func (c *validatorStore) handleSharesAdded(shares ...*types.SSVShare) error { c.byCommitteeID[committee.ID] = committee // Update byOperatorID + seenOperators := make(map[spectypes.OperatorID]struct{}) for _, operator := range share.Committee { + if _, seen := seenOperators[operator.Signer]; seen { + // Corrupt state. + return fmt.Errorf("duplicate operator in share. operator_id=%d", operator.Signer) + } + seenOperators[operator.Signer] = struct{}{} + data, exists := c.byOperatorID[operator.Signer] if !exists { data = &sharesAndCommittees{ @@ -484,8 +491,6 @@ func removeShareFromOperator(data *sharesAndCommittees, share *types.SSVShare) ( } func buildCommittee(shares []*types.SSVShare) *Committee { - // TODO: verify all shares are of the same committee. - committee := &Committee{ ID: shares[0].CommitteeID(), Operators: make([]spectypes.OperatorID, 0, len(shares)), From e8995ab4f08f00ca9bd8f8734b58a6599265ad6b Mon Sep 17 00:00:00 2001 From: olegshmuelov <45327364+olegshmuelov@users.noreply.github.com> Date: Wed, 8 Jan 2025 17:55:10 +0200 Subject: [PATCH 07/12] add error contains to tests --- registry/storage/validatorstore_test.go | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/registry/storage/validatorstore_test.go b/registry/storage/validatorstore_test.go index d27c22c76c..c862c94181 100644 --- a/registry/storage/validatorstore_test.go +++ b/registry/storage/validatorstore_test.go @@ -666,21 +666,21 @@ func TestValidatorStore_HandleNilAndEmptyStates(t *testing.T) { // Add nil share - this should be a no-op or handled gracefully t.Run("add nil share", func(t *testing.T) { - require.Error(t, store.handleSharesAdded(nil)) + require.ErrorContains(t, store.handleSharesAdded(nil), "nil share") require.Len(t, store.Validators(), 0) require.Len(t, store.Committees(), 0) }) // Update nil share - this should be a no-op or handled gracefully t.Run("update nil share", func(t *testing.T) { - require.Error(t, store.handleSharesUpdated(nil)) + require.ErrorContains(t, store.handleSharesUpdated(nil), "nil share") require.Len(t, store.Validators(), 0) require.Len(t, store.Committees(), 0) }) // Delete nil share - this should be a no-op or handled gracefully t.Run("delete nil share", func(t *testing.T) { - require.Error(t, store.handleShareRemoved(nil)) + require.ErrorContains(t, store.handleShareRemoved(nil), "nil share") require.Len(t, store.Validators(), 0) require.Len(t, store.Committees(), 0) }) @@ -737,8 +737,7 @@ func TestValidatorStore_AddDuplicateShares(t *testing.T) { t.Run("validate store after adding duplicate shares", func(t *testing.T) { err := store.handleSharesAdded(share1) - require.Error(t, err) - require.Contains(t, err.Error(), "share already exists in committe") + require.ErrorContains(t, err, "share already exists in committee") require.Len(t, store.Validators(), 1) require.Contains(t, store.Validators(), share1) }) @@ -761,8 +760,7 @@ func TestValidatorStore_UpdateNonExistingShare(t *testing.T) { t.Run("update non-existing share", func(t *testing.T) { require.NotPanics(t, func() { err := store.handleSharesUpdated(share1) - require.Error(t, err) - require.Contains(t, err.Error(), "committee not found") + require.ErrorContains(t, err, "committee not found") }) require.Len(t, store.Validators(), 0) @@ -925,8 +923,7 @@ func TestValidatorStore_MixedOperations(t *testing.T) { // Mixed operations shareMap[share1.ValidatorPubKey] = share1 // Re-add share1 err := store.handleSharesAdded(share1) - require.Error(t, err) - require.Contains(t, err.Error(), "share already exists in committee") + require.ErrorContains(t, err, "share already exists in committee") shareMap[updatedShare2.ValidatorPubKey] = updatedShare2 require.NoError(t, store.handleSharesUpdated(updatedShare2)) // Update share2 @@ -974,7 +971,8 @@ func TestValidatorStore_InvalidCommitteeHandling(t *testing.T) { } shareMap[invalidCommitteeShare.ValidatorPubKey] = invalidCommitteeShare - require.Error(t, store.handleSharesAdded(invalidCommitteeShare)) + err := store.handleSharesAdded(invalidCommitteeShare) + require.ErrorContains(t, err, "duplicate operator in share. operator_id=1") } func TestValidatorStore_BulkAddUpdate(t *testing.T) { @@ -1125,8 +1123,7 @@ func TestValidatorStore_HandleDuplicateSharesAdded(t *testing.T) { require.NoError(t, store.handleSharesAdded(duplicateShare)) err := store.handleSharesAdded(duplicateShare) - require.Error(t, err) - require.Contains(t, err.Error(), "share already exists in committee") + require.ErrorContains(t, err, "share already exists in committee") t.Run("check no duplicates in data.shares", func(t *testing.T) { // Validate the internal state for operator ID From 98b10e1a7bb052958db514c19497437141f2b0e9 Mon Sep 17 00:00:00 2001 From: olegshmuelov <45327364+olegshmuelov@users.noreply.github.com> Date: Wed, 8 Jan 2025 18:43:28 +0200 Subject: [PATCH 08/12] avoid shared state in committee handling --- registry/storage/validatorstore.go | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/registry/storage/validatorstore.go b/registry/storage/validatorstore.go index 44fca3b350..9d59e4a475 100644 --- a/registry/storage/validatorstore.go +++ b/registry/storage/validatorstore.go @@ -460,6 +460,11 @@ func containsShare(shares []*types.SSVShare, share *types.SSVShare) bool { } func removeShareFromCommittee(committee *Committee, share *types.SSVShare) error { + // Create new slices to avoid modifying the original slices directly + newValidators := make([]*types.SSVShare, 0, len(committee.Validators)) + newIndices := make([]phase0.ValidatorIndex, 0, len(committee.Indices)) + + removed := false for i, validator := range committee.Validators { if validator.ValidatorPubKey == share.ValidatorPubKey { if validator.ValidatorIndex != committee.Indices[i] { @@ -467,17 +472,25 @@ func removeShareFromCommittee(committee *Committee, share *types.SSVShare) error return fmt.Errorf("share index mismatch. validator_pubkey=%s committee_id=%s validator_index=%d committee_index=%d", hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:]), validator.ValidatorIndex, committee.Indices[i]) } - - // Remove share. - committee.Validators = append(committee.Validators[:i], committee.Validators[i+1:]...) - committee.Indices = append(committee.Indices[:i], committee.Indices[i+1:]...) - return nil + removed = true + continue // Skip adding this validator and its index to the new slices } + + // Add remaining validators and indices to the new slices + newValidators = append(newValidators, validator) + newIndices = append(newIndices, committee.Indices[i]) } - // Corrupt state. - return fmt.Errorf("share not found in committee. validator_pubkey=%s committee_id=%s", - hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:])) + if !removed { + // Corrupt state. + return fmt.Errorf("share not found in committee. validator_pubkey=%s committee_id=%s", + hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:])) + } + + // Replace the original slices with the newly created slices + committee.Validators = newValidators + committee.Indices = newIndices + return nil } func removeShareFromOperator(data *sharesAndCommittees, share *types.SSVShare) (found bool) { From 006438ec33e65172c2121432d2df26874e54a635 Mon Sep 17 00:00:00 2001 From: olegshmuelov <45327364+olegshmuelov@users.noreply.github.com> Date: Wed, 8 Jan 2025 18:59:44 +0200 Subject: [PATCH 09/12] avoid shared state in committee and operator updates --- registry/storage/validatorstore.go | 75 +++++++++++++++++++++--------- 1 file changed, 53 insertions(+), 22 deletions(-) diff --git a/registry/storage/validatorstore.go b/registry/storage/validatorstore.go index 9d59e4a475..94f2908700 100644 --- a/registry/storage/validatorstore.go +++ b/registry/storage/validatorstore.go @@ -323,13 +323,16 @@ func (c *validatorStore) handleShareRemoved(share *types.SSVShare) error { return fmt.Errorf("committee not found. id=%s", hex.EncodeToString(committeeID[:])) } - if err := removeShareFromCommittee(committee, share); err != nil { + newCommittee, err := removeShareFromCommittee(committee, share) + if err != nil { return fmt.Errorf("failed to remove share from committee. %w", err) } - committeeRemoved := len(committee.Validators) == 0 + committeeRemoved := len(newCommittee.Validators) == 0 if committeeRemoved { - delete(c.byCommitteeID, committee.ID) + delete(c.byCommitteeID, newCommittee.ID) + } else { + c.byCommitteeID[newCommittee.ID] = newCommittee } // Update byOperatorID @@ -353,7 +356,7 @@ func (c *validatorStore) handleShareRemoved(share *types.SSVShare) error { newCommittees = append(newCommittees[:i], newCommittees[i+1:]...) } else { // Replace the committee with the updated one - newCommittees[i] = committee + newCommittees[i] = newCommittee } break } @@ -391,19 +394,11 @@ func (c *validatorStore) handleSharesUpdated(shares ...*types.SSVShare) error { return fmt.Errorf("committee not found. id=%s", hex.EncodeToString(committeeID[:])) } - updated := false - for i, validator := range committee.Validators { - if validator.ValidatorPubKey == share.ValidatorPubKey { - committee.Validators[i] = share - committee.Indices[i] = share.ValidatorIndex - updated = true - break - } - } - if !updated { - return fmt.Errorf("share not found in committee. validator_pubkey=%s committee_id=%s", - hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committeeID[:])) + newCommittee, err := updateCommitteeWithShare(committee, share) + if err != nil { + return fmt.Errorf("failed to update share in committee. %w", err) } + c.byCommitteeID[committeeID] = newCommittee // Update byOperatorID for _, shareMember := range share.Committee { @@ -412,7 +407,7 @@ func (c *validatorStore) handleSharesUpdated(shares ...*types.SSVShare) error { return fmt.Errorf("operator not found. operator_id=%d", shareMember.Signer) } - updated = false + updated := false for i, s := range data.shares { if s.ValidatorPubKey == share.ValidatorPubKey { data.shares[i] = share @@ -429,7 +424,7 @@ func (c *validatorStore) handleSharesUpdated(shares ...*types.SSVShare) error { newCommittees := append([]*Committee(nil), data.committees...) for i, c := range newCommittees { if c.ID == committeeID { - newCommittees[i] = committee + newCommittees[i] = newCommittee break } } @@ -459,7 +454,7 @@ func containsShare(shares []*types.SSVShare, share *types.SSVShare) bool { return false } -func removeShareFromCommittee(committee *Committee, share *types.SSVShare) error { +func removeShareFromCommittee(committee *Committee, share *types.SSVShare) (*Committee, error) { // Create new slices to avoid modifying the original slices directly newValidators := make([]*types.SSVShare, 0, len(committee.Validators)) newIndices := make([]phase0.ValidatorIndex, 0, len(committee.Indices)) @@ -469,7 +464,7 @@ func removeShareFromCommittee(committee *Committee, share *types.SSVShare) error if validator.ValidatorPubKey == share.ValidatorPubKey { if validator.ValidatorIndex != committee.Indices[i] { // Corrupt state. - return fmt.Errorf("share index mismatch. validator_pubkey=%s committee_id=%s validator_index=%d committee_index=%d", + return nil, fmt.Errorf("share index mismatch. validator_pubkey=%s committee_id=%s validator_index=%d committee_index=%d", hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:]), validator.ValidatorIndex, committee.Indices[i]) } removed = true @@ -483,14 +478,50 @@ func removeShareFromCommittee(committee *Committee, share *types.SSVShare) error if !removed { // Corrupt state. - return fmt.Errorf("share not found in committee. validator_pubkey=%s committee_id=%s", + return nil, fmt.Errorf("share not found in committee. validator_pubkey=%s committee_id=%s", hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:])) } // Replace the original slices with the newly created slices committee.Validators = newValidators committee.Indices = newIndices - return nil + + newCommittee := &Committee{ + ID: committee.ID, + Operators: committee.Operators, + Validators: newValidators, + Indices: newIndices, + } + return newCommittee, nil +} + +func updateCommitteeWithShare(committee *Committee, share *types.SSVShare) (*Committee, error) { + newValidators := make([]*types.SSVShare, len(committee.Validators)) + newIndices := make([]phase0.ValidatorIndex, len(committee.Indices)) + copy(newValidators, committee.Validators) + copy(newIndices, committee.Indices) + + updated := false + for i, validator := range newValidators { + if validator.ValidatorPubKey == share.ValidatorPubKey { + newValidators[i] = share + newIndices[i] = share.ValidatorIndex + updated = true + break + } + } + + if !updated { + return nil, fmt.Errorf("share not found in committee. validator_pubkey=%s committee_id=%s", + hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:])) + } + + return &Committee{ + ID: committee.ID, + Operators: committee.Operators, + Validators: newValidators, + Indices: newIndices, + }, nil } func removeShareFromOperator(data *sharesAndCommittees, share *types.SSVShare) (found bool) { From 3f4b0225f13d1354547993fe60776ef67881e3f5 Mon Sep 17 00:00:00 2001 From: olegshmuelov <45327364+olegshmuelov@users.noreply.github.com> Date: Wed, 8 Jan 2025 19:29:16 +0200 Subject: [PATCH 10/12] reuse buildCommittee --- registry/storage/validatorstore.go | 68 ++++++++++++------------------ 1 file changed, 26 insertions(+), 42 deletions(-) diff --git a/registry/storage/validatorstore.go b/registry/storage/validatorstore.go index 94f2908700..1c09f7389c 100644 --- a/registry/storage/validatorstore.go +++ b/registry/storage/validatorstore.go @@ -454,74 +454,58 @@ func containsShare(shares []*types.SSVShare, share *types.SSVShare) bool { return false } -func removeShareFromCommittee(committee *Committee, share *types.SSVShare) (*Committee, error) { - // Create new slices to avoid modifying the original slices directly - newValidators := make([]*types.SSVShare, 0, len(committee.Validators)) - newIndices := make([]phase0.ValidatorIndex, 0, len(committee.Indices)) - +func removeShareFromCommittee(committee *Committee, shareToRemove *types.SSVShare) (*Committee, error) { + var shares []*types.SSVShare removed := false - for i, validator := range committee.Validators { - if validator.ValidatorPubKey == share.ValidatorPubKey { - if validator.ValidatorIndex != committee.Indices[i] { + + for i, share := range committee.Validators { + if share.ValidatorPubKey == shareToRemove.ValidatorPubKey { + if share.ValidatorIndex != committee.Indices[i] { // Corrupt state. return nil, fmt.Errorf("share index mismatch. validator_pubkey=%s committee_id=%s validator_index=%d committee_index=%d", - hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:]), validator.ValidatorIndex, committee.Indices[i]) + hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:]), share.ValidatorIndex, committee.Indices[i]) } removed = true - continue // Skip adding this validator and its index to the new slices + continue // Skip adding this share } - // Add remaining validators and indices to the new slices - newValidators = append(newValidators, validator) - newIndices = append(newIndices, committee.Indices[i]) + shares = append(shares, share) } if !removed { // Corrupt state. return nil, fmt.Errorf("share not found in committee. validator_pubkey=%s committee_id=%s", - hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:])) + hex.EncodeToString(shareToRemove.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:])) } - // Replace the original slices with the newly created slices - committee.Validators = newValidators - committee.Indices = newIndices - - newCommittee := &Committee{ - ID: committee.ID, - Operators: committee.Operators, - Validators: newValidators, - Indices: newIndices, + if len(shares) == 0 { + return &Committee{ + ID: committee.ID, + }, nil } - return newCommittee, nil -} -func updateCommitteeWithShare(committee *Committee, share *types.SSVShare) (*Committee, error) { - newValidators := make([]*types.SSVShare, len(committee.Validators)) - newIndices := make([]phase0.ValidatorIndex, len(committee.Indices)) - copy(newValidators, committee.Validators) - copy(newIndices, committee.Indices) + return buildCommittee(shares), nil +} +func updateCommitteeWithShare(committee *Committee, shareToUpdate *types.SSVShare) (*Committee, error) { + var shares []*types.SSVShare updated := false - for i, validator := range newValidators { - if validator.ValidatorPubKey == share.ValidatorPubKey { - newValidators[i] = share - newIndices[i] = share.ValidatorIndex + + for _, share := range committee.Validators { + if share.ValidatorPubKey == shareToUpdate.ValidatorPubKey { updated = true - break + shares = append(shares, shareToUpdate) + } else { + shares = append(shares, share) } } if !updated { return nil, fmt.Errorf("share not found in committee. validator_pubkey=%s committee_id=%s", - hex.EncodeToString(share.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:])) + hex.EncodeToString(shareToUpdate.ValidatorPubKey[:]), hex.EncodeToString(committee.ID[:])) } - return &Committee{ - ID: committee.ID, - Operators: committee.Operators, - Validators: newValidators, - Indices: newIndices, - }, nil + return buildCommittee(shares), nil } func removeShareFromOperator(data *sharesAndCommittees, share *types.SSVShare) (found bool) { From 3ef73ef5aebb137331788128286f8a928a7f2719 Mon Sep 17 00:00:00 2001 From: moshe-blox Date: Wed, 8 Jan 2025 19:50:03 +0200 Subject: [PATCH 11/12] copy instead of append --- registry/storage/validatorstore.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/registry/storage/validatorstore.go b/registry/storage/validatorstore.go index 1c09f7389c..0579300d7d 100644 --- a/registry/storage/validatorstore.go +++ b/registry/storage/validatorstore.go @@ -348,7 +348,8 @@ func (c *validatorStore) handleShareRemoved(share *types.SSVShare) error { } // Copy `committees` to avoid shared state issues. - newCommittees := append([]*Committee(nil), data.committees...) + newCommittees := make([]*Committee, len(data.committees)) + copy(newCommittees, data.committees) for i, c := range newCommittees { if c.ID == committeeID { if committeeRemoved { @@ -421,7 +422,9 @@ func (c *validatorStore) handleSharesUpdated(shares ...*types.SSVShare) error { } // Copy `committees` to avoid shared state issues. - newCommittees := append([]*Committee(nil), data.committees...) + // newCommittees := append([]*Committee(nil), data.committees...) + newCommittees := make([]*Committee, len(data.committees)) + copy(newCommittees, data.committees) for i, c := range newCommittees { if c.ID == committeeID { newCommittees[i] = newCommittee @@ -488,15 +491,14 @@ func removeShareFromCommittee(committee *Committee, shareToRemove *types.SSVShar } func updateCommitteeWithShare(committee *Committee, shareToUpdate *types.SSVShare) (*Committee, error) { - var shares []*types.SSVShare - updated := false + shares := make([]*types.SSVShare, len(committee.Validators)) + copy(shares, committee.Validators) - for _, share := range committee.Validators { + updated := false + for i, share := range shares { if share.ValidatorPubKey == shareToUpdate.ValidatorPubKey { + shares[i] = shareToUpdate updated = true - shares = append(shares, shareToUpdate) - } else { - shares = append(shares, share) } } From 8be4d4226094f70282292d20786905d241ad3041 Mon Sep 17 00:00:00 2001 From: olegshmuelov <45327364+olegshmuelov@users.noreply.github.com> Date: Thu, 9 Jan 2025 12:16:03 +0200 Subject: [PATCH 12/12] use copy instead append --- registry/storage/validatorstore.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/registry/storage/validatorstore.go b/registry/storage/validatorstore.go index 0579300d7d..e4e094c189 100644 --- a/registry/storage/validatorstore.go +++ b/registry/storage/validatorstore.go @@ -279,10 +279,10 @@ func (c *validatorStore) handleSharesAdded(shares ...*types.SSVShare) error { } data.shares = append(data.shares, share) - // Update committee at index. - // Replace with a new reference to avoid shared state issues. - newCommittees := append([]*Committee(nil), data.committees...) + // Copy `committees` to avoid shared state issues. updated := false + newCommittees := make([]*Committee, len(data.committees)) + copy(newCommittees, data.committees) for i, c := range newCommittees { if c.ID == committee.ID { newCommittees[i] = committee @@ -295,7 +295,6 @@ func (c *validatorStore) handleSharesAdded(shares ...*types.SSVShare) error { newCommittees = append(newCommittees, committee) } data.committees = newCommittees - c.byOperatorID[operator.Signer] = data } } @@ -422,7 +421,6 @@ func (c *validatorStore) handleSharesUpdated(shares ...*types.SSVShare) error { } // Copy `committees` to avoid shared state issues. - // newCommittees := append([]*Committee(nil), data.committees...) newCommittees := make([]*Committee, len(data.committees)) copy(newCommittees, data.committees) for i, c := range newCommittees {