Skip to content

Commit 545f0d3

Browse files
authored
Load generator (Layr-Labs#1218)
Signed-off-by: Cody Littley <[email protected]>
1 parent 3879e4f commit 545f0d3

File tree

7 files changed

+451
-16
lines changed

7 files changed

+451
-16
lines changed

common/testutils/random/test_random.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,24 @@ func (r *TestRandom) Uint64n(n uint64) uint64 {
103103
return r.Uint64() % n
104104
}
105105

106+
// Gaussian generates a random float64 from a Gaussian distribution with the given mean and standard deviation.
107+
func (r *TestRandom) Gaussian(mean float64, stddev float64) float64 {
108+
return r.NormFloat64()*stddev + mean
109+
}
110+
111+
// BoundedGaussian generates a random float64 from a Gaussian distribution with the given mean and standard deviation,
112+
// but bounded by the given min and max values. If a generated value exceeds the bounds, the bound is returned instead.
113+
func (r *TestRandom) BoundedGaussian(mean float64, stddev float64, min float64, max float64) float64 {
114+
val := r.Gaussian(mean, stddev)
115+
if val < min {
116+
return min
117+
}
118+
if val > max {
119+
return max
120+
}
121+
return val
122+
}
123+
106124
var _ io.Reader = &randIOReader{}
107125

108126
// randIOReader is an io.Reader that reads from a random number generator.

test/v2/load_generator.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package v2
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/Layr-Labs/eigenda/common/testutils/random"
7+
"github.com/Layr-Labs/eigenda/core"
8+
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
9+
"github.com/docker/go-units"
10+
"math/rand"
11+
"sync/atomic"
12+
"time"
13+
)
14+
15+
// LoadGeneratorConfig is the configuration for the load generator.
16+
type LoadGeneratorConfig struct {
17+
// The desired number of bytes per second to write.
18+
BytesPerSecond uint64
19+
// The average size of the blobs to write.
20+
AverageBlobSize uint64
21+
// The standard deviation of the blob size.
22+
BlobSizeStdDev uint64
23+
// By default, this utility reads each blob back from each relay once. The number of
24+
// reads per relay is multiplied by this factor. For example, If this is set to 3,
25+
// then each blob is read back from each relay 3 times.
26+
RelayReadAmplification uint64
27+
// By default, this utility reads chunks once. The number of chunk reads is multiplied
28+
// by this factor. If this is set to 3, then chunks are read back 3 times.
29+
ValidatorReadAmplification uint64
30+
// The maximum number of parallel blobs in flight.
31+
MaxParallelism uint64
32+
// The timeout for each blob dispersal.
33+
DispersalTimeout time.Duration
34+
// The quorums to use for the load test.
35+
Quorums []core.QuorumID
36+
}
37+
38+
// DefaultLoadGeneratorConfig returns the default configuration for the load generator.
39+
func DefaultLoadGeneratorConfig() *LoadGeneratorConfig {
40+
return &LoadGeneratorConfig{
41+
BytesPerSecond: 10 * units.MiB,
42+
AverageBlobSize: 1 * units.MiB,
43+
BlobSizeStdDev: 0.5 * units.MiB,
44+
RelayReadAmplification: 3,
45+
ValidatorReadAmplification: 3,
46+
MaxParallelism: 10,
47+
DispersalTimeout: 5 * time.Minute,
48+
Quorums: []core.QuorumID{0, 1},
49+
}
50+
}
51+
52+
type LoadGenerator struct {
53+
ctx context.Context
54+
cancel context.CancelFunc
55+
56+
// The configuration for the load generator.
57+
config *LoadGeneratorConfig
58+
// The test client to use for the load test.
59+
client *TestClient
60+
// The random number generator to use for the load test.
61+
rand *random.TestRandom
62+
// The time between starting each blob submission.
63+
submissionPeriod time.Duration
64+
// The channel to limit the number of parallel blob submissions.
65+
parallelismLimiter chan struct{}
66+
// if true, the load generator is running.
67+
alive atomic.Bool
68+
// The channel to signal when the load generator is finished.
69+
finishedChan chan struct{}
70+
// The metrics for the load generator.
71+
metrics *loadGeneratorMetrics
72+
}
73+
74+
// NewLoadGenerator creates a new LoadGenerator.
75+
func NewLoadGenerator(
76+
config *LoadGeneratorConfig,
77+
client *TestClient,
78+
rand *random.TestRandom) *LoadGenerator {
79+
80+
submissionFrequency := config.BytesPerSecond / config.AverageBlobSize
81+
submissionPeriod := time.Second / time.Duration(submissionFrequency)
82+
83+
parallelismLimiter := make(chan struct{}, config.MaxParallelism)
84+
85+
ctx := context.Background()
86+
ctx, cancel := context.WithCancel(ctx)
87+
88+
metrics := newLoadGeneratorMetrics(client.metrics.registry)
89+
90+
return &LoadGenerator{
91+
ctx: ctx,
92+
cancel: cancel,
93+
config: config,
94+
client: client,
95+
rand: rand,
96+
submissionPeriod: submissionPeriod,
97+
parallelismLimiter: parallelismLimiter,
98+
alive: atomic.Bool{},
99+
finishedChan: make(chan struct{}),
100+
metrics: metrics,
101+
}
102+
}
103+
104+
// Start starts the load generator. If block is true, this function will block until Stop() or
105+
// the load generator crashes. If block is false, this function will return immediately.
106+
func (l *LoadGenerator) Start(block bool) {
107+
l.alive.Store(true)
108+
l.run()
109+
if block {
110+
<-l.finishedChan
111+
}
112+
}
113+
114+
// Stop stops the load generator.
115+
func (l *LoadGenerator) Stop() {
116+
l.finishedChan <- struct{}{}
117+
l.alive.Store(false)
118+
l.client.Stop()
119+
l.cancel()
120+
}
121+
122+
// run runs the load generator.
123+
func (l *LoadGenerator) run() {
124+
ticker := time.NewTicker(l.submissionPeriod)
125+
for l.alive.Load() {
126+
<-ticker.C
127+
l.parallelismLimiter <- struct{}{}
128+
go l.submitBlob()
129+
}
130+
}
131+
132+
// Submits a single blob to the network. This function does not return until it reads the blob back
133+
// from the network, which may take tens of seconds.
134+
func (l *LoadGenerator) submitBlob() {
135+
ctx, cancel := context.WithTimeout(l.ctx, l.config.DispersalTimeout)
136+
l.metrics.startOperation()
137+
defer func() {
138+
<-l.parallelismLimiter
139+
l.metrics.endOperation()
140+
cancel()
141+
}()
142+
143+
payloadSize := int(l.rand.BoundedGaussian(
144+
float64(l.config.AverageBlobSize),
145+
float64(l.config.BlobSizeStdDev),
146+
1.0,
147+
float64(l.client.Config.MaxBlobSize+1)))
148+
payload := l.rand.Bytes(payloadSize)
149+
paddedPayload := codec.ConvertByPaddingEmptyByte(payload)
150+
if uint64(len(paddedPayload)) > l.client.Config.MaxBlobSize {
151+
paddedPayload = paddedPayload[:l.client.Config.MaxBlobSize]
152+
}
153+
154+
key, err := l.client.DispersePayload(ctx, paddedPayload, l.config.Quorums, rand.Uint32())
155+
if err != nil {
156+
fmt.Printf("failed to disperse blob: %v\n", err)
157+
}
158+
blobCert := l.client.WaitForCertification(ctx, *key, l.config.Quorums)
159+
160+
// Unpad the payload
161+
unpaddedPayload := codec.RemoveEmptyByteFromPaddedBytes(paddedPayload)
162+
163+
// Read the blob from the relays and validators
164+
for i := uint64(0); i < l.config.RelayReadAmplification; i++ {
165+
l.client.ReadBlobFromRelays(ctx, *key, blobCert, unpaddedPayload)
166+
}
167+
for i := uint64(0); i < l.config.ValidatorReadAmplification; i++ {
168+
l.client.ReadBlobFromValidators(ctx, blobCert, l.config.Quorums, unpaddedPayload)
169+
}
170+
}

test/v2/load_generator_metrics.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package v2
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
"github.com/prometheus/client_golang/prometheus/promauto"
6+
)
7+
8+
// loadGeneratorMetrics encapsulates the metrics for the load generator.
9+
type loadGeneratorMetrics struct {
10+
operationsInFlight *prometheus.GaugeVec
11+
// TODO (cody-littley) count successes, failures, and timeouts
12+
}
13+
14+
// newLoadGeneratorMetrics creates a new loadGeneratorMetrics.0
15+
func newLoadGeneratorMetrics(registry *prometheus.Registry) *loadGeneratorMetrics {
16+
operationsInFlight := promauto.With(registry).NewGaugeVec(
17+
prometheus.GaugeOpts{
18+
Namespace: namespace,
19+
Name: "operations_in_flight",
20+
Help: "Number of operations in flight",
21+
},
22+
[]string{},
23+
)
24+
25+
return &loadGeneratorMetrics{
26+
operationsInFlight: operationsInFlight,
27+
}
28+
}
29+
30+
// startOperation should be called when starting the process of dispersing + verifying a blob
31+
func (m *loadGeneratorMetrics) startOperation() {
32+
m.operationsInFlight.WithLabelValues().Inc()
33+
}
34+
35+
// endOperation should be called when finishing the process of dispersing + verifying a blob
36+
func (m *loadGeneratorMetrics) endOperation() {
37+
m.operationsInFlight.WithLabelValues().Dec()
38+
}

test/v2/load_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package v2
2+
3+
import (
4+
"github.com/Layr-Labs/eigenda/common/testutils/random"
5+
"github.com/docker/go-units"
6+
"os"
7+
"testing"
8+
)
9+
10+
func TestLightLoad(t *testing.T) {
11+
rand := random.NewTestRandom(t)
12+
c := getClient(t)
13+
14+
config := DefaultLoadGeneratorConfig()
15+
config.AverageBlobSize = 100 * units.KiB
16+
config.BlobSizeStdDev = 50 * units.KiB
17+
config.BytesPerSecond = 100 * units.KiB
18+
19+
generator := NewLoadGenerator(config, c, rand)
20+
21+
signals := make(chan os.Signal)
22+
go func() {
23+
<-signals
24+
generator.Stop()
25+
}()
26+
27+
generator.Start(true)
28+
}

0 commit comments

Comments
 (0)