Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: relax indexedChainState to ChainState for retrieval #943

Open
wants to merge 17 commits into
base: master
Choose a base branch
from

Conversation

hopeyen
Copy link
Contributor

@hopeyen hopeyen commented Dec 2, 2024

Why are these changes needed?

Enable retrieval client to run without indexing

Checks

  • I've made sure the tests are passing. Note that there might be a few flaky tests, in that case, please comment that they are not relevant.
  • I've checked the new test coverage and the coverage percentage didn't drop.
  • Testing Strategy
    • Unit tests
    • Integration tests
    • This PR is not tested :(

Copy link
Contributor

@ian-shim ian-shim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@@ -28,7 +28,14 @@ func (cs *ChainState) GetOperatorStateByOperator(ctx context.Context, blockNumbe
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move these methods to eth reader and remove this struct?
Not sure why we need a separate struct for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're caching the operator sockets, maybe we should keep this struct separate

// buildSocketMap returns a map from operatorID to socket address for the operators in the operatorsByQuorum
func (cs *ChainState) buildSocketMap(ctx context.Context, operatorsByQuorum core.OperatorStakes) (map[core.OperatorID]string, error) {
socketMap := make(map[core.OperatorID]string)
for _, quorum := range operatorsByQuorum {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the key the quorum and the value map[OperatorIndex]OperatorStake here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly. This loop is to get all the operator ID and their socket from all quorums

Copy link
Contributor

@ian-shim ian-shim Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one way to address this would be to preload cache initially by querying every single operator in the current block. Then for the following requests, query the current set of operators, query the logs by filtering on SocketUpdate event, find any socket updates for existing operators, and refresh sockets for all operators.
Then the subsequent requests will only cost 2 eth calls

@ian-shim ian-shim mentioned this pull request Dec 20, 2024
5 tasks
@@ -38,7 +45,12 @@ func (cs *ChainState) GetOperatorState(ctx context.Context, blockNumber uint, qu
return nil, err
}

return getOperatorState(operatorsByQuorum, uint32(blockNumber))
socketMap, err := cs.buildSocketMap(ctx, operatorsByQuorum)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean whenever we call this method, it will query the chain # operators times?
this seems very inefficient, should we cache this?

Copy link
Contributor Author

@hopeyen hopeyen Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I expect this too. I can add socket map to the ChainState cache and only query if the operator is missing from the socket map. Although, that means we might need to add a refresh or a failover somewhere so that we don't use the old socket addresses after operators make an updater. Lmk what you think

@@ -28,7 +28,14 @@ func (cs *ChainState) GetOperatorStateByOperator(ctx context.Context, blockNumbe
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're caching the operator sockets, maybe we should keep this struct separate

@hopeyen hopeyen force-pushed the hope/distributed-retrieval branch from b8566be to 59b7ff7 Compare December 20, 2024 03:09
@hopeyen hopeyen force-pushed the hope/distributed-retrieval branch 2 times, most recently from d393a7e to 36b9280 Compare January 3, 2025 06:40
@hopeyen hopeyen force-pushed the hope/distributed-retrieval branch from f473ed5 to ac07b0c Compare February 11, 2025 15:43
Tx core.Reader
SocketMap map[core.OperatorID]*string
socketMu sync.Mutex
socketPrevBlockNumber uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These struct members could benefit from some docs. E.g. it's not immediately clear from the name socketPrevBlockNumber what this exactly is

}

func NewChainState(tx core.Reader, client common.EthClient) *ChainState {
currentBlockNumber, err := client.BlockByNumber(context.Background(), nil)
// TODO: consider changing function signature to return error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think this would be a good change to make

return getOperatorState(operatorsByQuorum, uint32(blockNumber))
err = cs.refreshSocketMap(ctx, operatorsByQuorum)
if err != nil {
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The errors throughout this file would benefit from added context with fmt.Errorf, e.g. block number, operator ID, etc.

@@ -59,7 +85,131 @@ func (cs *ChainState) GetOperatorSocket(ctx context.Context, blockNumber uint, o
return socket, nil
}

func getOperatorState(operatorsByQuorum core.OperatorStakes, blockNumber uint32) (*core.OperatorState, error) {
// buildSocketMap returns a map from operatorID to socket address for the operators in the operatorsByQuorum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc is out of date. It's actually updating a map internally, not returning a map as stated. Should this method be renamed?

missingOperatorIds := make([]core.OperatorID, 0)
for _, quorum := range operatorsByQuorum {
for _, operator := range quorum {
missingOperatorIds = append(missingOperatorIds, operator.OperatorID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about the logic here.

  1. In this loop, you're collecting ALL operator IDs into a collection, called missingOperatorIds
  2. In buildSocketMap, you fetch the socket for every operator, with GetOperatorSocket
  3. Then, in buildSocketMap, you update cs.SocketMap with all the new values

I thought the purpose of this new cache utility was that you don't need to fetch the socket for every operator every time, but it seems like that's what's happening. You have a comment above that seems to indicate an intention to only add operators to missingOperatorIds if the socket isn't already in the map. Is that logic missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really good catch, thanks!

} else if method.Name == "updateOperatorSocket" {
socket = inputs[0].(string)
} else {
return fmt.Errorf("unknown method filtered for socket update event: %s", method.Name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error is a problem, too. My interpretation is that, if a given event triggers this error, then this method will abort. But the very same event will be processed the next time this is tried, and it would error again. So a single event that triggers this error case would break the entire state object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should never run into this case 🤔 I am going to let it print a log and skip this case; the block range will be updated and the log won't be processed again. I'm not sure if this is the best handle here

if err != nil {
return err
}
methodSig := calldata[:4]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this and other direct index accessing, I think you need to add checks that the length is actually the expected length. Otherwise, we could have a panic at runtime

} else {
return fmt.Errorf("unknown method filtered for socket update event: %s", method.Name)
}
operatorID := core.OperatorID(log.Topics[1].Bytes())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the length of Bytes need to be checked? OperatorID is expecting 32 bytes

Index: core.OperatorIndex(ind),
Stake: op.Stake,
Index: core.OperatorIndex(ind),
Socket: *socketMap[op.OperatorID],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this an unprotected access of the cs.SocketMap?

}
retrieverServiceServer := retriever.NewServer(config, logger, retrievalClient, cs, chainClient)
// This only start the metrics server; consider unwrapping the function
retrieverServiceServer.Start(context.Background())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think it makes sense to make this change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem like I can access metrics with the pointer to retriever service server

@litt3
Copy link
Contributor

litt3 commented Feb 11, 2025

This PR re-adds an old file, contracts/bindings/EigenDABlobVerifier/binding.go. Presumably this was a merge conflict- this file can be deleted

@hopeyen hopeyen force-pushed the hope/distributed-retrieval branch from 71ad17b to f0508b7 Compare February 13, 2025 00:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants