Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement v2 client GET functionality #972

Merged
merged 45 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
4625e96
Write GET tests
litt3 Dec 9, 2024
f07e820
Merge branch 'master' into client-v2-get
litt3 Dec 10, 2024
885c131
Respond to PR comments
litt3 Dec 10, 2024
6848663
Create new V2 client config
litt3 Dec 10, 2024
a48afb1
Respond to more PR comments
litt3 Dec 11, 2024
225f2a3
Fix failing unit test
litt3 Dec 12, 2024
d265f6a
Merge branch 'master' into client-v2-get
litt3 Dec 12, 2024
e9d91c5
Adopt new package structure
litt3 Dec 12, 2024
dd3c262
Use new test random util
litt3 Dec 12, 2024
88df865
Implement relay call timeout
litt3 Dec 12, 2024
505a1f0
Use correct error join method
litt3 Dec 12, 2024
2b87633
Merge branch 'master' into client-v2-get
litt3 Jan 8, 2025
cf1cd80
Make updates required by upstream changes
litt3 Jan 8, 2025
53893d8
Update how FFT and IFFT are referred to
litt3 Jan 13, 2025
0373dd7
Implement GetPayload
litt3 Jan 13, 2025
826a026
Remove GetBlob, leaving only GetPayload
litt3 Jan 13, 2025
975b6e5
Remove unnecessary codec mock
litt3 Jan 13, 2025
0666d24
Use more reasonable line breaks for logs
litt3 Jan 13, 2025
0a49aa5
Test malicious cert
litt3 Jan 13, 2025
1193ce7
Merge branch 'master' into client-v2-get
litt3 Jan 13, 2025
496e277
Merge branch 'master' into client-v2-get
litt3 Jan 14, 2025
2d392ff
Finish test coverage
litt3 Jan 14, 2025
db51291
Fix commitment length check
litt3 Jan 14, 2025
4f3280c
Merge branch 'master' into client-v2-get
litt3 Jan 16, 2025
aaa1342
Call VerifyBlobV2
litt3 Jan 17, 2025
9be51e6
Simply verify blob
litt3 Jan 17, 2025
cc6b9a1
Merge branch 'master' into client-v2-get
litt3 Jan 17, 2025
ae926c7
Clean up
litt3 Jan 17, 2025
f82d128
Merge branch 'master' into client-v2-get
litt3 Jan 17, 2025
017a48c
Return error from verification method
litt3 Jan 21, 2025
b645370
Merge branch 'master' into client-v2-get
litt3 Jan 21, 2025
03f8018
Address some PR comments
litt3 Jan 21, 2025
ef3944d
Rename methods, and clean up
litt3 Jan 21, 2025
78cab0d
Actually apply fix for poor doc
litt3 Jan 22, 2025
e27d3ea
Fix goroutine safety comment
litt3 Jan 22, 2025
f6126af
Merge branch 'master' into client-v2-get
litt3 Jan 22, 2025
28c3d02
Fix test
litt3 Jan 22, 2025
036a222
Rework polynomial encoding enum, and descriptions
litt3 Jan 22, 2025
7b66df6
Make PR fixes
litt3 Jan 23, 2025
ad3dc97
Move conversion utils
litt3 Jan 23, 2025
6930a47
Remove GetCodec
litt3 Jan 23, 2025
ec190ca
Merge branch 'master' into client-v2-get
litt3 Jan 24, 2025
d27c463
Merge branch 'master' into client-v2-get
litt3 Jan 24, 2025
840ca9a
Fix merge
litt3 Jan 24, 2025
16f0c74
Add additional comment about random
litt3 Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions api/clients/codecs/mock/blob_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package mock

import mock "github.com/stretchr/testify/mock"

// BlobCodec is an autogenerated mock type for the BlobCodec type
type BlobCodec struct {
mock.Mock
}

// DecodeBlob provides a mock function with given fields: encodedData
func (_m *BlobCodec) DecodeBlob(encodedData []byte) ([]byte, error) {
ret := _m.Called(encodedData)

if len(ret) == 0 {
panic("no return value specified for DecodeBlob")
}

var r0 []byte
var r1 error
if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok {
return rf(encodedData)
}
if rf, ok := ret.Get(0).(func([]byte) []byte); ok {
r0 = rf(encodedData)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}

if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(encodedData)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// EncodeBlob provides a mock function with given fields: rawData
func (_m *BlobCodec) EncodeBlob(rawData []byte) ([]byte, error) {
ret := _m.Called(rawData)

if len(ret) == 0 {
panic("no return value specified for EncodeBlob")
}

var r0 []byte
var r1 error
if rf, ok := ret.Get(0).(func([]byte) ([]byte, error)); ok {
return rf(rawData)
}
if rf, ok := ret.Get(0).(func([]byte) []byte); ok {
r0 = rf(rawData)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}

if rf, ok := ret.Get(1).(func([]byte) error); ok {
r1 = rf(rawData)
} else {
r1 = ret.Error(1)
}

return r0, r1
}
2 changes: 1 addition & 1 deletion api/clients/mock/relay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewRelayClient() *MockRelayClient {
}

func (c *MockRelayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) {
args := c.Called(blobKey)
args := c.Called(ctx, relayKey, blobKey)
if args.Get(0) == nil {
return nil, args.Error(1)
}
Expand Down
33 changes: 33 additions & 0 deletions api/clients/v2/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package v2

import (
"github.com/Layr-Labs/eigenda/api/clients/codecs"
"time"
)

// VerificationMode is an enum that represents the different ways that a blob may be encoded/decoded between
// the client and the disperser.
type VerificationMode uint

const (
// TODO: write good docs here for IFFT and NoIFFT (I need to update my understanding to be able to write this)
samlaf marked this conversation as resolved.
Show resolved Hide resolved
IFFT VerificationMode = iota
NoIFFT
)

// EigenDAClientConfig contains configuration values for EigenDAClient
type EigenDAClientConfig struct {
// The blob encoding version to use when writing and reading blobs
BlobEncodingVersion codecs.BlobEncodingVersion
samlaf marked this conversation as resolved.
Show resolved Hide resolved

// If PointVerificationMode is IFFT, then the client codec will do an IFFT on blobs before they are dispersed, and
// will do an FFT on blobs after receiving them. This makes it possible to open points on the KZG commitment to prove
// that the field elements correspond to the commitment.
//
// If PointVerificationMode is NoIFFT, the blob must be supplied in its entirety, to perform a verification
// that any part of the data matches the KZG commitment.
PointVerificationMode VerificationMode

// The timeout duration for relay calls
samlaf marked this conversation as resolved.
Show resolved Hide resolved
RelayTimeout time.Duration
}
160 changes: 160 additions & 0 deletions api/clients/v2/eigenda_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package v2

import (
"context"
"errors"
"fmt"
"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/clients/codecs"
core "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/cockroachdb/errors/join"
"math/rand"
)

// EigenDAClient provides the ability to get blobs from the relay subsystem, and to send new blobs to the disperser.
//
// This struct is not threadsafe.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it not goroutine safe? Pretty sure this is used in the proxy which can receive parallel requests that are treated in separate goroutines. Do we have a race condition right now?

Copy link
Contributor Author

@litt3 litt3 Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This client wraps a RelayClient, which doesn't specify thread safety. So I started with "not threadsafe", for the sake of caution

But more fundamentally, my question would be if the proxy actually needs parallelism here. The requests are treated in separate goroutines, but afaict they are being served by a single Manager, which could synchronize access to this client.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to synchronize access to the client in proxy though! Client requests can take a long time to deal with because of multiple network routines. Plus when dispersing you need to wait for the CERTIFIED status to be ready which can take up to 10 seconds.

We should be much more fine-grained with synchronization. I would suggest we put a mutex around rand (would say that we don't even care if theres a race on this source of randomness, but perhaps it can actually cause some weird stuff to happen...?).

Also seems like accountant in DisperserClient is thread safe (see this discussion) so I think we should be fine for concurrent support with these 2, but let's do a quick check to make sure there's nothing else we're missing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed synchronously, there doesn't appear to be anything that would make RelayClient unsafe.

We also discussed that synchronizing in the proxy is not acceptable, due to very long times for dispersal (~10 seconds)

I've updated this comment accordingly e27d3eaa

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait but its not! Missing mutex around rand

Copy link
Contributor Author

@litt3 litt3 Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capturing our synchronous discussion

  • the way that random is currently being used in this PR is goroutine safe
  • not all methods on Rand are guaranteed safe

I added an additional comment about this 16f0c748

type EigenDAClient struct {
log logging.Logger
// doesn't need to be cryptographically secure, as it's only used to distribute load across relays
random *rand.Rand
config *EigenDAClientConfig
codec codecs.BlobCodec
relayClient clients.RelayClient
}

// BuildEigenDAClient builds an EigenDAClient from config structs.
func BuildEigenDAClient(
log logging.Logger,
config *EigenDAClientConfig,
relayClientConfig *clients.RelayClientConfig) (*EigenDAClient, error) {

relayClient, err := clients.NewRelayClient(relayClientConfig, log)
if err != nil {
return nil, fmt.Errorf("new relay client: %w", err)
}

codec, err := createCodec(config)
if err != nil {
return nil, err
}

return NewEigenDAClient(log, rand.New(rand.NewSource(rand.Int63())), config, relayClient, codec)
}

// NewEigenDAClient assembles an EigenDAClient from subcomponents that have already been constructed and initialized.
func NewEigenDAClient(
log logging.Logger,
random *rand.Rand,
config *EigenDAClientConfig,
relayClient clients.RelayClient,
codec codecs.BlobCodec) (*EigenDAClient, error) {

return &EigenDAClient{
log: log,
random: random,
config: config,
codec: codec,
relayClient: relayClient,
}, nil
}

// GetBlob iteratively attempts to retrieve a given blob with key blobKey from the relays listed in the blobCertificate.
//
// The relays are attempted in random order.
//
// The returned blob is decoded.
func (c *EigenDAClient) GetBlob(
ctx context.Context,
blobKey core.BlobKey,
blobCertificate core.BlobCertificate) ([]byte, error) {

relayKeyCount := len(blobCertificate.RelayKeys)

if relayKeyCount == 0 {
return nil, errors.New("relay key count is zero")
}
samlaf marked this conversation as resolved.
Show resolved Hide resolved

// create a randomized array of indices, so that it isn't always the first relay in the list which gets hit
indices := c.random.Perm(relayKeyCount)

// TODO (litt3): consider creating a utility which can deprioritize relays that fail to respond (or respond maliciously)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, utility should also prioritize relays with lower latencies (although perhaps it should still reach out to lower priority relays with small but non-zero probability).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanded TODO to mention prioritizing low latency relays


// iterate over relays in random order, until we are able to get the blob from someone
for _, val := range indices {
relayKey := blobCertificate.RelayKeys[val]

data, err := c.getBlobWithTimeout(ctx, relayKey, blobKey)

// if GetBlob returned an error, try calling a different relay
if err != nil {
c.log.Warn("blob couldn't be retrieved from relay", "blobKey", blobKey, "relayKey", relayKey, "error", err)
continue
}

// An honest relay should never send an empty blob
if len(data) == 0 {
c.log.Warn("blob received from relay had length 0", "blobKey", blobKey, "relayKey", relayKey)
continue
}

// An honest relay should never send a blob which cannot be decoded
decodedData, err := c.codec.DecodeBlob(data)
if err != nil {
c.log.Warn("error decoding blob from relay", "blobKey", blobKey, "relayKey", relayKey, "error", err)
continue
}

return decodedData, nil
}

return nil, fmt.Errorf("unable to retrieve blob from any relay. relay count: %d", relayKeyCount)
}

// getBlobWithTimeout attempts to get a blob from a given relay, and times out based on config.RelayTimeout
func (c *EigenDAClient) getBlobWithTimeout(
ctx context.Context,
relayKey core.RelayKey,
blobKey core.BlobKey) ([]byte, error) {

timeoutCtx, cancel := context.WithTimeout(ctx, c.config.RelayTimeout)
defer cancel()

return c.relayClient.GetBlob(timeoutCtx, relayKey, blobKey)
}

// GetCodec returns the codec the client uses for encoding and decoding blobs
func (c *EigenDAClient) GetCodec() codecs.BlobCodec {
return c.codec
}

// Close is responsible for calling close on all internal clients. This method will do its best to close all internal
// clients, even if some closes fail.
//
// Any and all errors returned from closing internal clients will be joined and returned.
//
// This method should only be called once.
func (c *EigenDAClient) Close() error {
relayClientErr := c.relayClient.Close()

// TODO: this is using join, since there will be more subcomponents requiring closing after adding PUT functionality
return join.Join(relayClientErr)
}

// createCodec creates the codec based on client config values
func createCodec(config *EigenDAClientConfig) (codecs.BlobCodec, error) {
lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(config.BlobEncodingVersion)
if err != nil {
return nil, fmt.Errorf("create low level codec: %w", err)
}

switch config.PointVerificationMode {
case NoIFFT:
return codecs.NewNoIFFTCodec(lowLevelCodec), nil
case IFFT:
return codecs.NewIFFTCodec(lowLevelCodec), nil
default:
return nil, fmt.Errorf("unsupported point verification mode: %d", config.PointVerificationMode)
}
}
Loading
Loading