Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement v2 client GET functionality #972

Merged
merged 45 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
4625e96
Write GET tests
litt3 Dec 9, 2024
f07e820
Merge branch 'master' into client-v2-get
litt3 Dec 10, 2024
885c131
Respond to PR comments
litt3 Dec 10, 2024
6848663
Create new V2 client config
litt3 Dec 10, 2024
a48afb1
Respond to more PR comments
litt3 Dec 11, 2024
225f2a3
Fix failing unit test
litt3 Dec 12, 2024
d265f6a
Merge branch 'master' into client-v2-get
litt3 Dec 12, 2024
e9d91c5
Adopt new package structure
litt3 Dec 12, 2024
dd3c262
Use new test random util
litt3 Dec 12, 2024
88df865
Implement relay call timeout
litt3 Dec 12, 2024
505a1f0
Use correct error join method
litt3 Dec 12, 2024
2b87633
Merge branch 'master' into client-v2-get
litt3 Jan 8, 2025
cf1cd80
Make updates required by upstream changes
litt3 Jan 8, 2025
53893d8
Update how FFT and IFFT are referred to
litt3 Jan 13, 2025
0373dd7
Implement GetPayload
litt3 Jan 13, 2025
826a026
Remove GetBlob, leaving only GetPayload
litt3 Jan 13, 2025
975b6e5
Remove unnecessary codec mock
litt3 Jan 13, 2025
0666d24
Use more reasonable line breaks for logs
litt3 Jan 13, 2025
0a49aa5
Test malicious cert
litt3 Jan 13, 2025
1193ce7
Merge branch 'master' into client-v2-get
litt3 Jan 13, 2025
496e277
Merge branch 'master' into client-v2-get
litt3 Jan 14, 2025
2d392ff
Finish test coverage
litt3 Jan 14, 2025
db51291
Fix commitment length check
litt3 Jan 14, 2025
4f3280c
Merge branch 'master' into client-v2-get
litt3 Jan 16, 2025
aaa1342
Call VerifyBlobV2
litt3 Jan 17, 2025
9be51e6
Simply verify blob
litt3 Jan 17, 2025
cc6b9a1
Merge branch 'master' into client-v2-get
litt3 Jan 17, 2025
ae926c7
Clean up
litt3 Jan 17, 2025
f82d128
Merge branch 'master' into client-v2-get
litt3 Jan 17, 2025
017a48c
Return error from verification method
litt3 Jan 21, 2025
b645370
Merge branch 'master' into client-v2-get
litt3 Jan 21, 2025
03f8018
Address some PR comments
litt3 Jan 21, 2025
ef3944d
Rename methods, and clean up
litt3 Jan 21, 2025
78cab0d
Actually apply fix for poor doc
litt3 Jan 22, 2025
e27d3ea
Fix goroutine safety comment
litt3 Jan 22, 2025
f6126af
Merge branch 'master' into client-v2-get
litt3 Jan 22, 2025
28c3d02
Fix test
litt3 Jan 22, 2025
036a222
Rework polynomial encoding enum, and descriptions
litt3 Jan 22, 2025
7b66df6
Make PR fixes
litt3 Jan 23, 2025
ad3dc97
Move conversion utils
litt3 Jan 23, 2025
6930a47
Remove GetCodec
litt3 Jan 23, 2025
ec190ca
Merge branch 'master' into client-v2-get
litt3 Jan 24, 2025
d27c463
Merge branch 'master' into client-v2-get
litt3 Jan 24, 2025
840ca9a
Fix merge
litt3 Jan 24, 2025
16f0c74
Add additional comment about random
litt3 Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions api/clients/codecs/mock/BlobCodec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package mock
litt3 marked this conversation as resolved.
Show resolved Hide resolved

import mock "github.com/stretchr/testify/mock"

// BlobCodec is an autogenerated mock type for the BlobCodec type
type BlobCodec struct {
mock.Mock
}

// DecodeBlob provides a mock function with given fields: encodedData
func (_m *BlobCodec) DecodeBlob(encodedData []byte) ([]byte, error) {
ret := _m.Called(encodedData)

if len(ret) == 0 {
litt3 marked this conversation as resolved.
Show resolved Hide resolved
panic("no return value specified for DecodeBlob")
}

var r0 []byte
var r1 error
if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok {
return rf(encodedData)
}
if rf, ok := ret.Get(0).(func([]byte) []byte); ok {
r0 = rf(encodedData)
} else {
if ret.Get(0) != nil {
litt3 marked this conversation as resolved.
Show resolved Hide resolved
r0 = ret.Get(0).([]byte)
litt3 marked this conversation as resolved.
Show resolved Hide resolved
}
}

if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(encodedData)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// EncodeBlob provides a mock function with given fields: rawData
func (_m *BlobCodec) EncodeBlob(rawData []byte) ([]byte, error) {
ret := _m.Called(rawData)

if len(ret) == 0 {
panic("no return value specified for EncodeBlob")
}

var r0 []byte
var r1 error
if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok {
return rf(rawData)
}
if rf, ok := ret.Get(0).(func([]byte) []byte); ok {
r0 = rf(rawData)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}

if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(rawData)
} else {
r1 = ret.Error(1)
}

return r0, r1
}
142 changes: 142 additions & 0 deletions api/clients/eigenda_client_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package clients

import (
"context"
"fmt"
"github.com/Layr-Labs/eigenda/api/clients/codecs"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/hashicorp/go-multierror"
"math/rand"
)

// EigenDAClientV2 provides the ability to get blobs from the relay subsystem, and to send new blobs to the disperser.
type EigenDAClientV2 interface {
samlaf marked this conversation as resolved.
Show resolved Hide resolved
// GetBlob iteratively attempts to retrieve a given blob with key blobKey from the relays listed in the blobCertificate.
GetBlob(ctx context.Context, blobKey corev2.BlobKey, blobCertificate corev2.BlobCertificate) ([]byte, error)
samlaf marked this conversation as resolved.
Show resolved Hide resolved
GetCodec() codecs.BlobCodec
Close() error
samlaf marked this conversation as resolved.
Show resolved Hide resolved
samlaf marked this conversation as resolved.
Show resolved Hide resolved
}

type eigenDAClientV2 struct {
samlaf marked this conversation as resolved.
Show resolved Hide resolved
clientConfig EigenDAClientConfig
samlaf marked this conversation as resolved.
Show resolved Hide resolved
log logging.Logger
relayClient RelayClient
codec codecs.BlobCodec
}

var _ EigenDAClientV2 = &eigenDAClientV2{}

// BuildEigenDAClientV2 builds an EigenDAClientV2 from config structs.
func BuildEigenDAClientV2(
log logging.Logger,
clientConfig EigenDAClientConfig,
relayClientConfig RelayClientConfig) (EigenDAClientV2, error) {

err := clientConfig.CheckAndSetDefaults()
if err != nil {
return nil, err
samlaf marked this conversation as resolved.
Show resolved Hide resolved
}

relayClient, err := NewRelayClient(&relayClientConfig, log)
samlaf marked this conversation as resolved.
Show resolved Hide resolved

lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(clientConfig.PutBlobEncodingVersion)
if err != nil {
return nil, fmt.Errorf("create low level codec: %w", err)
}
var codec codecs.BlobCodec
if clientConfig.DisablePointVerificationMode {
codec = codecs.NewNoIFFTCodec(lowLevelCodec)
} else {
codec = codecs.NewIFFTCodec(lowLevelCodec)
}
samlaf marked this conversation as resolved.
Show resolved Hide resolved

return NewEigenDAClientV2(log, clientConfig, relayClient, codec)
}

// NewEigenDAClientV2 assembles an EigenDAClientV2 from subcomponents that have already been constructed and initialized.
func NewEigenDAClientV2(
log logging.Logger,
clientConfig EigenDAClientConfig,
relayClient RelayClient,
codec codecs.BlobCodec) (EigenDAClientV2, error) {

return &eigenDAClientV2{
clientConfig: clientConfig,
log: log,
relayClient: relayClient,
codec: codec,
}, nil
}

// GetBlob iteratively attempts to retrieve a given blob with key blobKey from the relays listed in the blobCertificate.
//
// The relays are attempted in random order.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why this choice. How is the relayKey list populated? Shouldnt we let the person populating be able to dictate some preference on the relays by iterating through them in normal order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My logic (which may be based on incorrect assumptions) is that, since the relayKey list is part of the cert, the order would the same for every client.

It seems we wouldn't want all clients to attempt the first relay in the list, rather we should distribute load across all available relays.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. I think this makes sense. Although now I just realized I'm confused myself. @ian-shim does this make sense? Are the relayKeys meant to be for data replication or sharding? Aka do we need to hit all the relayKeys to get all the blobs, or any ONE should do?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The disperser implementation itself shuffles the relay keys, so attempting each relay in order is actually fine. But it's good that we're not assuming that relay keys aren't ordered in any particular way.

Aka do we need to hit all the relayKeys to get all the blobs, or any ONE should do?

Any one should do!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attempting each relay in order is actually fine

@ian-shim The randomization is done while creating the cert, and all clients will be given the same cert from the disperser, right?

So, for a given blob that can be served by relays [X, Y, Z], if all clients were to attempt the relays in order, then they all would try to get the blob from relay X, and relays [Y, Z] wouldn't receive any load.

Is this the correct understanding, and if so, are you saying this behavior would be ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's also how I understand it. Agree that your shuffling is better because if we assume that all validator nodes are perfectly in sync (which is rare though, and this not being true would validate ian's approach), then they would all be querying the same relay (first one for that blob), then all querying another one (for the next blob), etc.

Agree that your approach is better, but if the nodes are often out of sync (even by a few blocks only), then that would probably be sufficient to distribute the load across the relays (assuming there's only very few relays).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the relayKeys meant to be for data replication or sharding?

why-not-both-why-not

The primary purpose of having different relay IDs is sharding. Not all relays have all blobs, allowing us to grow the capacity of the relay cohort by adding more relays and assigning fewer blobs to each. But data replication is also something we can achieve by leveraging the relay IDs. By assigning each blob to multiple relays (but not all relays), we can ensure that the failure of individual relays does not result in data unavailability.

//
// The returned blob is decoded.
func (c *eigenDAClientV2) GetBlob(ctx context.Context, blobKey corev2.BlobKey, blobCertificate corev2.BlobCertificate) ([]byte, error) {
// create a randomized array of indices, so that it isn't always the first relay in the list which gets hit
random := rand.New(rand.NewSource(rand.Int63()))
samlaf marked this conversation as resolved.
Show resolved Hide resolved
relayKeyCount := len(blobCertificate.RelayKeys)
samlaf marked this conversation as resolved.
Show resolved Hide resolved
var indices []int
litt3 marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < relayKeyCount; i++ {
indices = append(indices, i)
}
random.Shuffle(len(indices), func(i int, j int) {
samlaf marked this conversation as resolved.
Show resolved Hide resolved
indices[i], indices[j] = indices[j], indices[i]
})

// TODO (litt3): consider creating a utility which can deprioritize relays that fail to respond (or respond maliciously)
litt3 marked this conversation as resolved.
Show resolved Hide resolved

// iterate over relays in random order, until we are able to get the blob from someone
for _, val := range indices {
relayKey := blobCertificate.RelayKeys[val]

data, err := c.relayClient.GetBlob(ctx, relayKey, blobKey)

// if GetBlob returned an error, try calling a different relay
if err != nil {
// TODO: should this log type be downgraded to debug to avoid log spam? I'm not sure how frequent retrieval
// from a relay will fail in practice (?)
c.log.Info("blob couldn't be retrieved from relay", "blobKey", blobKey, "relayKey", relayKey)
samlaf marked this conversation as resolved.
Show resolved Hide resolved
continue
}

// An honest relay should never send an empty blob
if len(data) == 0 {
c.log.Warn("blob received from relay had length 0", "blobKey", blobKey, "relayKey", relayKey)
continue
}

// An honest relay should never send a blob which cannot be decoded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To expand these invariants: an honest relay should never send a blob which doesn't respect its polynomial commitments. The thing is though this check would get caught upstream (i.e, within proxy directly) and probably cause the request to fail. The proxy client would trigger a retry which would probably route to another relay.

this isn't a big problem rn and we can just document it somewhere for circle back sometime in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason not to check this invariant here, included in this PR? Seems like it wouldn't be hard to add

Copy link
Contributor Author

@litt3 litt3 Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commitments are being checked in the most recent iteration.

decodedData, err := c.codec.DecodeBlob(data)
if err != nil {
c.log.Warn("error decoding blob", "blobKey", blobKey, "relayKey", relayKey)
samlaf marked this conversation as resolved.
Show resolved Hide resolved
continue
}

return decodedData, nil
}

return nil, fmt.Errorf("unable to retrieve blob from any relay")
litt3 marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *eigenDAClientV2) GetCodec() codecs.BlobCodec {
return c.codec
}

func (c *eigenDAClientV2) Close() error {
var errList *multierror.Error

// TODO: this is using a multierror, since there will be more subcomponents requiring closing after adding PUT functionality
samlaf marked this conversation as resolved.
Show resolved Hide resolved
relayClientErr := c.relayClient.Close()
if relayClientErr != nil {
errList = multierror.Append(errList, relayClientErr)
}

if errList != nil {
return errList.ErrorOrNil()
}

return nil
}
Loading
Loading