Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventhandler: implement handleOperatorRemoved #1796

Open
wants to merge 13 commits into
base: stage
Choose a base branch
from
2 changes: 2 additions & 0 deletions cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"go.uber.org/zap"

spectypes "github.com/ssvlabs/ssv-spec/types"

"github.com/ssvlabs/ssv/api/handlers"
apiserver "github.com/ssvlabs/ssv/api/server"
"github.com/ssvlabs/ssv/beacon/goclient"
Expand Down Expand Up @@ -234,6 +235,7 @@
messageValidator := validation.New(
networkConfig,
nodeStorage.ValidatorStore(),
nodeStorage,

Check warning on line 238 in cli/operator/node.go

View check run for this annotation

Codecov / codecov/patch

cli/operator/node.go#L238

Added line #L238 was not covered by tests
dutyStore,
signatureVerifier,
validation.WithLogger(logger),
Expand Down
7 changes: 3 additions & 4 deletions eth/eventhandler/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,21 @@ import (
"math/big"
"time"

"github.com/ssvlabs/ssv/ekm"
"github.com/ssvlabs/ssv/observability"
"go.opentelemetry.io/otel/metric"

"github.com/attestantio/go-eth2-client/spec/phase0"
ethcommon "github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/ekm"
"github.com/ssvlabs/ssv/eth/contract"
"github.com/ssvlabs/ssv/eth/eventparser"
"github.com/ssvlabs/ssv/eth/executionclient"
"github.com/ssvlabs/ssv/eth/localevents"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/networkconfig"
"github.com/ssvlabs/ssv/observability"
operatordatastore "github.com/ssvlabs/ssv/operator/datastore"
"github.com/ssvlabs/ssv/operator/keys"
nodestorage "github.com/ssvlabs/ssv/operator/storage"
Expand Down
225 changes: 112 additions & 113 deletions eth/eventhandler/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,119 +256,6 @@ func TestHandleBlockEventsStream(t *testing.T) {
require.Equal(t, len(ops), len(operators))
})
})
t.Run("test OperatorRemoved event handle", func(t *testing.T) {

// Should return MalformedEventError and no changes to the state
t.Run("test OperatorRemoved incorrect operator ID", func(t *testing.T) {
// Call the contract method
_, err = boundContract.SimcontractTransactor.RemoveOperator(auth, 100500)
require.NoError(t, err)
sim.Commit()

block := <-logs
require.NotEmpty(t, block.Logs)
require.Equal(t, ethcommon.HexToHash("0x0e0ba6c2b04de36d6d509ec5bd155c43a9fe862f8052096dd54f3902a74cca3e"), block.Logs[0].Topics[0])

eventsCh := make(chan executionclient.BlockLogs)
go func() {
defer close(eventsCh)
eventsCh <- block
}()

// Check that there is 1 registered operator
operators, err := eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops), len(operators))

// Handle the event
lastProcessedBlock, err := eh.HandleBlockEventsStream(ctx, eventsCh, false)
require.Equal(t, blockNum+1, lastProcessedBlock)
require.NoError(t, err)
blockNum++

// Check if the operator wasn't removed successfully
operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops), len(operators))
})

// Receive event, unmarshall, parse, check parse event is not nil or with error, operator id is correct
// TODO: fix this test. It checks nothing, due the handleOperatorRemoved method is no-op currently
t.Run("test OperatorRemoved happy flow", func(t *testing.T) {
// Prepare a new operator to remove it later in this test
op, err := createOperators(1, operatorsCount)
require.NoError(t, err)
operatorsCount++

encodedPubKey, err := op[0].privateKey.Public().Base64()
require.NoError(t, err)

// Call the contract method
packedOperatorPubKey, err := eventparser.PackOperatorPublicKey(encodedPubKey)
require.NoError(t, err)
_, err = boundContract.SimcontractTransactor.RegisterOperator(auth, packedOperatorPubKey, big.NewInt(100_000_000))
require.NoError(t, err)

sim.Commit()

block := <-logs
require.NotEmpty(t, block.Logs)
require.Equal(t, ethcommon.HexToHash("0xd839f31c14bd632f424e307b36abff63ca33684f77f28e35dc13718ef338f7f4"), block.Logs[0].Topics[0])

eventsCh := make(chan executionclient.BlockLogs)
go func() {
defer close(eventsCh)
eventsCh <- block
}()

// Check that there is no registered operators
operators, err := eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops), len(operators))

// Handle OperatorAdded event
lastProcessedBlock, err := eh.HandleBlockEventsStream(ctx, eventsCh, false)
require.Equal(t, blockNum+1, lastProcessedBlock)
require.NoError(t, err)
blockNum++
// Check storage for the new operator
operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops)+1, len(operators))

// Now start the OperatorRemoved event handling
// Call the contract method
_, err = boundContract.SimcontractTransactor.RemoveOperator(auth, 4)
require.NoError(t, err)
sim.Commit()

block = <-logs
require.NotEmpty(t, block.Logs)
require.Equal(t, ethcommon.HexToHash("0x0e0ba6c2b04de36d6d509ec5bd155c43a9fe862f8052096dd54f3902a74cca3e"), block.Logs[0].Topics[0])

eventsCh = make(chan executionclient.BlockLogs)
go func() {
defer close(eventsCh)
eventsCh <- block
}()

operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops)+1, len(operators))

// Handle OperatorRemoved event
lastProcessedBlock, err = eh.HandleBlockEventsStream(ctx, eventsCh, false)
require.Equal(t, blockNum+1, lastProcessedBlock)
require.NoError(t, err)
blockNum++

// TODO: this should be adjusted when eth/eventhandler/handlers.go#L109 is resolved
// Check if the operator was removed successfully
//operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
//require.NoError(t, err)
//require.Equal(t, len(ops), len(operators))
})
})

// Receive event, unmarshall, parse, check parse event is not nil or with an error,
// public key is correct, owner is correct, operator ids are correct, shares are correct
Expand Down Expand Up @@ -1346,6 +1233,118 @@ func TestHandleBlockEventsStream(t *testing.T) {
require.False(t, share.Liquidated)
})
})

t.Run("test OperatorRemoved event handle", func(t *testing.T) {

// Should return MalformedEventError and no changes to the state
t.Run("test OperatorRemoved incorrect operator ID", func(t *testing.T) {
// Call the contract method
_, err = boundContract.SimcontractTransactor.RemoveOperator(auth, 100500)
require.NoError(t, err)
sim.Commit()

block := <-logs
require.NotEmpty(t, block.Logs)
require.Equal(t, ethcommon.HexToHash("0x0e0ba6c2b04de36d6d509ec5bd155c43a9fe862f8052096dd54f3902a74cca3e"), block.Logs[0].Topics[0])

eventsCh := make(chan executionclient.BlockLogs)
go func() {
defer close(eventsCh)
eventsCh <- block
}()

// Check that there is 1 registered operator
operators, err := eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops), len(operators))

// Handle the event
lastProcessedBlock, err := eh.HandleBlockEventsStream(ctx, eventsCh, false)
require.Equal(t, blockNum+1, lastProcessedBlock)
require.NoError(t, err)
blockNum++

// Check if the operator wasn't removed successfully
operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops), len(operators))
})

// Receive event, unmarshall, parse, check parse event is not nil or with error, operator id is correct
nkryuchkov marked this conversation as resolved.
Show resolved Hide resolved
t.Run("test OperatorRemoved happy flow", func(t *testing.T) {
// Prepare a new operator to remove it later in this test
op, err := createOperators(1, operatorsCount)
require.NoError(t, err)
operatorsCount++

encodedPubKey, err := op[0].privateKey.Public().Base64()
require.NoError(t, err)

// Call the contract method
packedOperatorPubKey, err := eventparser.PackOperatorPublicKey(encodedPubKey)
require.NoError(t, err)
_, err = boundContract.SimcontractTransactor.RegisterOperator(auth, packedOperatorPubKey, big.NewInt(100_000_000))
require.NoError(t, err)

sim.Commit()

block := <-logs
require.NotEmpty(t, block.Logs)
require.Equal(t, ethcommon.HexToHash("0xd839f31c14bd632f424e307b36abff63ca33684f77f28e35dc13718ef338f7f4"), block.Logs[0].Topics[0])

eventsCh := make(chan executionclient.BlockLogs)
go func() {
defer close(eventsCh)
eventsCh <- block
}()

// Check that there is no registered operators
operators, err := eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops), len(operators))

// Handle OperatorAdded event
lastProcessedBlock, err := eh.HandleBlockEventsStream(ctx, eventsCh, false)
require.Equal(t, blockNum+1, lastProcessedBlock)
require.NoError(t, err)
blockNum++
// Check storage for the new operator
operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops)+1, len(operators))

// Now start the OperatorRemoved event handling
// Call the contract method
_, err = boundContract.SimcontractTransactor.RemoveOperator(auth, 4)
require.NoError(t, err)
sim.Commit()
Comment on lines +1316 to +1320
Copy link
Contributor

@iurii-ssv iurii-ssv Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are removing the wrong operator here (wrong ID - not the one this test added) - this is because createOperators starts numerating operators with 1 (not 0) - not a big deal this being the last test ... but still better adjust it to use op[0].id instead

Copy link
Contributor

@iurii-ssv iurii-ssv Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, don't bother changing it just yet - I'm preparing a change (see #1796 (comment)) you'll probably wanna use (and will include this too).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in #1842


block = <-logs
require.NotEmpty(t, block.Logs)
require.Equal(t, ethcommon.HexToHash("0x0e0ba6c2b04de36d6d509ec5bd155c43a9fe862f8052096dd54f3902a74cca3e"), block.Logs[0].Topics[0])

eventsCh = make(chan executionclient.BlockLogs)
go func() {
defer close(eventsCh)
eventsCh <- block
}()

operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops)+1, len(operators))

// Handle OperatorRemoved event
lastProcessedBlock, err = eh.HandleBlockEventsStream(ctx, eventsCh, false)
require.Equal(t, blockNum+1, lastProcessedBlock)
require.NoError(t, err)
blockNum++

// Check if the operator was removed successfully
operators, err = eh.nodeStorage.ListOperators(nil, 0, 0)
require.NoError(t, err)
require.Equal(t, len(ops), len(operators))
})
})
}

func setupEventHandler(t *testing.T, ctx context.Context, logger *zap.Logger, network *networkconfig.NetworkConfig, operator *testOperator, useMockCtrl bool) (*EventHandler, *mocks.MockController, error) {
Expand Down
62 changes: 61 additions & 1 deletion eth/eventhandler/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
if err != nil {
return fmt.Errorf("could not get operator data by public key: %w", err)
}

if pubkeyExists {
logger.Warn("malformed event: operator public key already exists",
fields.OperatorPubKey(operatorData.PublicKey))
Expand All @@ -83,11 +84,50 @@
if err != nil {
return fmt.Errorf("save operator data: %w", err)
}

if exists {
logger.Debug("operator data already exists")
return nil
}

var modifiedShares []*ssvtypes.SSVShare
for _, share := range eh.nodeStorage.Shares().List(txn, registrystorage.ByOperatorID(event.OperatorId)) {
if !share.Liquidated {
// Skip non-liquidated shares since they are already active.
// A share may become liquidated on OperatorRemoved or ClusterLiquidated events.
// On ValidatorRemoved the share is removed from the storage, so it won't be reactivated on OperatorAdded.
continue

Check warning on line 99 in eth/eventhandler/handlers.go

View check run for this annotation

Codecov / codecov/patch

eth/eventhandler/handlers.go#L95-L99

Added lines #L95 - L99 were not covered by tests
}

var existingOperatorsCount uint64
for _, shareMember := range share.Committee {
if shareMember.Signer == event.OperatorId {
existingOperatorsCount++
continue

Check warning on line 106 in eth/eventhandler/handlers.go

View check run for this annotation

Codecov / codecov/patch

eth/eventhandler/handlers.go#L102-L106

Added lines #L102 - L106 were not covered by tests
}

_, ok, err := eh.nodeStorage.GetOperatorData(txn, shareMember.Signer)
if err != nil {
return fmt.Errorf("get operator data: %w", err)
}

Check warning on line 112 in eth/eventhandler/handlers.go

View check run for this annotation

Codecov / codecov/patch

eth/eventhandler/handlers.go#L109-L112

Added lines #L109 - L112 were not covered by tests

if ok {
existingOperatorsCount++
}

Check warning on line 116 in eth/eventhandler/handlers.go

View check run for this annotation

Codecov / codecov/patch

eth/eventhandler/handlers.go#L114-L116

Added lines #L114 - L116 were not covered by tests
}

if existingOperatorsCount >= share.Quorum() {
share.Liquidated = false
modifiedShares = append(modifiedShares, share)
}

Check warning on line 122 in eth/eventhandler/handlers.go

View check run for this annotation

Codecov / codecov/patch

eth/eventhandler/handlers.go#L119-L122

Added lines #L119 - L122 were not covered by tests
}

if len(modifiedShares) > 0 {
if err := eh.nodeStorage.Shares().Save(txn, modifiedShares...); err != nil {
return fmt.Errorf("save shares: %w", err)
}

Check warning on line 128 in eth/eventhandler/handlers.go

View check run for this annotation

Codecov / codecov/patch

eth/eventhandler/handlers.go#L126-L128

Added lines #L126 - L128 were not covered by tests
}

if bytes.Equal(event.PublicKey, eh.operatorDataStore.GetOperatorData().PublicKey) {
eh.operatorDataStore.SetOperatorData(od)
logger = logger.With(zap.Bool("own_operator", true))
Expand Down Expand Up @@ -120,7 +160,27 @@
fields.Owner(od.OwnerAddress),
)

// This function is currently no-op and it will do nothing. Operator removed event is not used in the current implementation.
if err := eh.nodeStorage.DeleteOperatorData(txn, od.ID); err != nil {
return fmt.Errorf("delete operator data: %w", err)
}

Check warning on line 165 in eth/eventhandler/handlers.go

View check run for this annotation

Codecov / codecov/patch

eth/eventhandler/handlers.go#L164-L165

Added lines #L164 - L165 were not covered by tests

var modifiedShares []*ssvtypes.SSVShare
for _, share := range eh.nodeStorage.Shares().List(txn, registrystorage.ByOperatorID(event.OperatorId)) {
exists, err := eh.nodeStorage.QuorumExists(txn, share.OperatorIDs(), share.Quorum())
if err != nil {
return fmt.Errorf("check if operator exists: %w", err)
}

Check warning on line 172 in eth/eventhandler/handlers.go

View check run for this annotation

Codecov / codecov/patch

eth/eventhandler/handlers.go#L171-L172

Added lines #L171 - L172 were not covered by tests
if !exists {
share.Liquidated = true
modifiedShares = append(modifiedShares, share)
}
}

if len(modifiedShares) > 0 {
if err := eh.nodeStorage.Shares().Save(txn, modifiedShares...); err != nil {
return fmt.Errorf("save shares: %w", err)
}

Check warning on line 182 in eth/eventhandler/handlers.go

View check run for this annotation

Codecov / codecov/patch

eth/eventhandler/handlers.go#L181-L182

Added lines #L181 - L182 were not covered by tests
}

logger.Debug("processed event")
return nil
Expand Down
1 change: 1 addition & 0 deletions message/validation/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var (
ErrWrongDomain = Error{text: "wrong domain"}
ErrNoShareMetadata = Error{text: "share has no metadata"}
ErrUnknownValidator = Error{text: "unknown validator"}
ErrUnknownOperator = Error{text: "unknown operator"} // TODO: reject?
ErrValidatorLiquidated = Error{text: "validator is liquidated"}
ErrValidatorNotAttesting = Error{text: "validator is not attesting"}
ErrEarlySlotMessage = Error{text: "message was sent before slot starts"}
Expand Down
Loading
Loading