Skip to content

Commit

Permalink
Load generator (#1218)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Feb 5, 2025
1 parent 3879e4f commit 545f0d3
Show file tree
Hide file tree
Showing 7 changed files with 451 additions and 16 deletions.
18 changes: 18 additions & 0 deletions common/testutils/random/test_random.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,24 @@ func (r *TestRandom) Uint64n(n uint64) uint64 {
return r.Uint64() % n
}

// Gaussian generates a random float64 from a Gaussian distribution with the given mean and standard deviation.
func (r *TestRandom) Gaussian(mean float64, stddev float64) float64 {
return r.NormFloat64()*stddev + mean
}

// BoundedGaussian generates a random float64 from a Gaussian distribution with the given mean and standard deviation,
// but bounded by the given min and max values. If a generated value exceeds the bounds, the bound is returned instead.
func (r *TestRandom) BoundedGaussian(mean float64, stddev float64, min float64, max float64) float64 {
val := r.Gaussian(mean, stddev)
if val < min {
return min
}
if val > max {
return max
}
return val
}

var _ io.Reader = &randIOReader{}

// randIOReader is an io.Reader that reads from a random number generator.
Expand Down
170 changes: 170 additions & 0 deletions test/v2/load_generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package v2

import (
"context"
"fmt"
"github.com/Layr-Labs/eigenda/common/testutils/random"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/docker/go-units"
"math/rand"
"sync/atomic"
"time"
)

// LoadGeneratorConfig is the configuration for the load generator.
type LoadGeneratorConfig struct {
// The desired number of bytes per second to write.
BytesPerSecond uint64
// The average size of the blobs to write.
AverageBlobSize uint64
// The standard deviation of the blob size.
BlobSizeStdDev uint64
// By default, this utility reads each blob back from each relay once. The number of
// reads per relay is multiplied by this factor. For example, If this is set to 3,
// then each blob is read back from each relay 3 times.
RelayReadAmplification uint64
// By default, this utility reads chunks once. The number of chunk reads is multiplied
// by this factor. If this is set to 3, then chunks are read back 3 times.
ValidatorReadAmplification uint64
// The maximum number of parallel blobs in flight.
MaxParallelism uint64
// The timeout for each blob dispersal.
DispersalTimeout time.Duration
// The quorums to use for the load test.
Quorums []core.QuorumID
}

// DefaultLoadGeneratorConfig returns the default configuration for the load generator.
func DefaultLoadGeneratorConfig() *LoadGeneratorConfig {
return &LoadGeneratorConfig{
BytesPerSecond: 10 * units.MiB,
AverageBlobSize: 1 * units.MiB,
BlobSizeStdDev: 0.5 * units.MiB,
RelayReadAmplification: 3,
ValidatorReadAmplification: 3,
MaxParallelism: 10,
DispersalTimeout: 5 * time.Minute,
Quorums: []core.QuorumID{0, 1},
}
}

type LoadGenerator struct {
ctx context.Context
cancel context.CancelFunc

// The configuration for the load generator.
config *LoadGeneratorConfig
// The test client to use for the load test.
client *TestClient
// The random number generator to use for the load test.
rand *random.TestRandom
// The time between starting each blob submission.
submissionPeriod time.Duration
// The channel to limit the number of parallel blob submissions.
parallelismLimiter chan struct{}
// if true, the load generator is running.
alive atomic.Bool
// The channel to signal when the load generator is finished.
finishedChan chan struct{}
// The metrics for the load generator.
metrics *loadGeneratorMetrics
}

// NewLoadGenerator creates a new LoadGenerator.
func NewLoadGenerator(
config *LoadGeneratorConfig,
client *TestClient,
rand *random.TestRandom) *LoadGenerator {

submissionFrequency := config.BytesPerSecond / config.AverageBlobSize
submissionPeriod := time.Second / time.Duration(submissionFrequency)

parallelismLimiter := make(chan struct{}, config.MaxParallelism)

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)

metrics := newLoadGeneratorMetrics(client.metrics.registry)

return &LoadGenerator{
ctx: ctx,
cancel: cancel,
config: config,
client: client,
rand: rand,
submissionPeriod: submissionPeriod,
parallelismLimiter: parallelismLimiter,
alive: atomic.Bool{},
finishedChan: make(chan struct{}),
metrics: metrics,
}
}

// Start starts the load generator. If block is true, this function will block until Stop() or
// the load generator crashes. If block is false, this function will return immediately.
func (l *LoadGenerator) Start(block bool) {
l.alive.Store(true)
l.run()
if block {
<-l.finishedChan
}
}

// Stop stops the load generator.
func (l *LoadGenerator) Stop() {
l.finishedChan <- struct{}{}
l.alive.Store(false)
l.client.Stop()
l.cancel()
}

// run runs the load generator.
func (l *LoadGenerator) run() {
ticker := time.NewTicker(l.submissionPeriod)
for l.alive.Load() {
<-ticker.C
l.parallelismLimiter <- struct{}{}
go l.submitBlob()
}
}

// Submits a single blob to the network. This function does not return until it reads the blob back
// from the network, which may take tens of seconds.
func (l *LoadGenerator) submitBlob() {
ctx, cancel := context.WithTimeout(l.ctx, l.config.DispersalTimeout)
l.metrics.startOperation()
defer func() {
<-l.parallelismLimiter
l.metrics.endOperation()
cancel()
}()

payloadSize := int(l.rand.BoundedGaussian(
float64(l.config.AverageBlobSize),
float64(l.config.BlobSizeStdDev),
1.0,
float64(l.client.Config.MaxBlobSize+1)))
payload := l.rand.Bytes(payloadSize)
paddedPayload := codec.ConvertByPaddingEmptyByte(payload)
if uint64(len(paddedPayload)) > l.client.Config.MaxBlobSize {
paddedPayload = paddedPayload[:l.client.Config.MaxBlobSize]
}

key, err := l.client.DispersePayload(ctx, paddedPayload, l.config.Quorums, rand.Uint32())
if err != nil {
fmt.Printf("failed to disperse blob: %v\n", err)
}
blobCert := l.client.WaitForCertification(ctx, *key, l.config.Quorums)

// Unpad the payload
unpaddedPayload := codec.RemoveEmptyByteFromPaddedBytes(paddedPayload)

// Read the blob from the relays and validators
for i := uint64(0); i < l.config.RelayReadAmplification; i++ {
l.client.ReadBlobFromRelays(ctx, *key, blobCert, unpaddedPayload)
}
for i := uint64(0); i < l.config.ValidatorReadAmplification; i++ {
l.client.ReadBlobFromValidators(ctx, blobCert, l.config.Quorums, unpaddedPayload)
}
}
38 changes: 38 additions & 0 deletions test/v2/load_generator_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package v2

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// loadGeneratorMetrics encapsulates the metrics for the load generator.
type loadGeneratorMetrics struct {
operationsInFlight *prometheus.GaugeVec
// TODO (cody-littley) count successes, failures, and timeouts
}

// newLoadGeneratorMetrics creates a new loadGeneratorMetrics.0
func newLoadGeneratorMetrics(registry *prometheus.Registry) *loadGeneratorMetrics {
operationsInFlight := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "operations_in_flight",
Help: "Number of operations in flight",
},
[]string{},
)

return &loadGeneratorMetrics{
operationsInFlight: operationsInFlight,
}
}

// startOperation should be called when starting the process of dispersing + verifying a blob
func (m *loadGeneratorMetrics) startOperation() {
m.operationsInFlight.WithLabelValues().Inc()
}

// endOperation should be called when finishing the process of dispersing + verifying a blob
func (m *loadGeneratorMetrics) endOperation() {
m.operationsInFlight.WithLabelValues().Dec()
}
28 changes: 28 additions & 0 deletions test/v2/load_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package v2

import (
"github.com/Layr-Labs/eigenda/common/testutils/random"
"github.com/docker/go-units"
"os"
"testing"
)

func TestLightLoad(t *testing.T) {
rand := random.NewTestRandom(t)
c := getClient(t)

config := DefaultLoadGeneratorConfig()
config.AverageBlobSize = 100 * units.KiB
config.BlobSizeStdDev = 50 * units.KiB
config.BytesPerSecond = 100 * units.KiB

generator := NewLoadGenerator(config, c, rand)

signals := make(chan os.Signal)
go func() {
<-signals
generator.Stop()
}()

generator.Start(true)
}
Loading

0 comments on commit 545f0d3

Please sign in to comment.