Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 14 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/spf13/cobra"
"golang.org/x/time/rate"

"github.com/sei-protocol/sei-load/config"
"github.com/sei-protocol/sei-load/generator"
Expand All @@ -25,20 +26,6 @@ var (
configFile string
)

// ResolvedSettings holds the final resolved settings after applying precedence
type ResolvedSettings struct {
Workers int
TPS float64
StatsInterval time.Duration
BufferSize int
DryRun bool
Debug bool
TrackReceipts bool
TrackBlocks bool
TrackUserLatency bool
Prewarm bool
}

var rootCmd = &cobra.Command{
Use: "seiload",
Short: "Sei Chain Load Test v2",
Expand Down Expand Up @@ -149,8 +136,20 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to create generator: %w", err)
}

// Create shared rate limiter for all workers if TPS is specified
var sharedLimiter *rate.Limiter
if settings.TPS > 0 {
// Convert TPS to interval: 1/tps seconds = (1/tps) * 1e9 nanoseconds
intervalNs := int64((1.0 / settings.TPS) * 1e9)
sharedLimiter = rate.NewLimiter(rate.Every(time.Duration(intervalNs)), 1)
log.Printf("📈 Rate limiting enabled: %.2f TPS shared across all workers", settings.TPS)
} else {
// No rate limiting
sharedLimiter = rate.NewLimiter(rate.Inf, 1)
}

// Create the sender from the config struct
snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers)
snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers, sharedLimiter)
if err != nil {
return fmt.Errorf("failed to create sender: %w", err)
}
Expand Down Expand Up @@ -192,11 +191,6 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {

// Create dispatcher
dispatcher := sender.NewDispatcher(gen, snd)
if settings.TPS > 0 {
// Convert TPS to interval: 1/tps seconds = (1/tps) * 1e9 nanoseconds
intervalNs := int64((1.0 / settings.TPS) * 1e9)
dispatcher.SetRateLimit(time.Duration(intervalNs))
}

// Set statistics collector for dispatcher
dispatcher.SetStatsCollector(collector)
Expand Down
26 changes: 0 additions & 26 deletions sender/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package sender
import (
"context"
"fmt"
"golang.org/x/time/rate"
"log"
"sync"
"time"

"github.com/sei-protocol/sei-load/generator"
"github.com/sei-protocol/sei-load/stats"
Expand All @@ -19,9 +17,6 @@ type Dispatcher struct {
prewarmGen utils.Option[generator.Generator] // Optional prewarm generator
sender TxSender

// Configuration
limiter *rate.Limiter

// Statistics
totalSent uint64
mu sync.RWMutex
Expand All @@ -33,17 +28,9 @@ func NewDispatcher(gen generator.Generator, sender TxSender) *Dispatcher {
return &Dispatcher{
generator: gen,
sender: sender,
limiter: rate.NewLimiter(rate.Inf, 1), // No rate limiting by default
}
}

// SetRateLimit sets the minimum time between transaction generations
func (d *Dispatcher) SetRateLimit(duration time.Duration) {
d.mu.Lock()
defer d.mu.Unlock()
d.limiter = rate.NewLimiter(rate.Every(duration), 1)
}

// SetStatsCollector sets the statistics collector for this dispatcher
func (d *Dispatcher) SetStatsCollector(collector *stats.Collector) {
d.mu.Lock()
Expand Down Expand Up @@ -100,14 +87,7 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error {

// Start begins the dispatcher's transaction generation and sending loop
func (d *Dispatcher) Run(ctx context.Context) error {
d.mu.RLock()
limiter := d.limiter
d.mu.RUnlock()

for {
if err := limiter.Wait(ctx); err != nil {
return err
}
// Generate a transaction from main generator
tx, ok := d.generator.Generate()
if !ok {
Expand All @@ -130,13 +110,7 @@ func (d *Dispatcher) RunBatch(ctx context.Context, count int) error {
if count <= 0 {
return fmt.Errorf("count must be positive")
}
d.mu.RLock()
limiter := d.limiter
d.mu.RUnlock()
for i := range count {
if err := limiter.Wait(ctx); err != nil {
return err
}
// Generate a transaction
tx, ok := d.generator.Generate()
if !ok {
Expand Down
8 changes: 6 additions & 2 deletions sender/sharded_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"sync"

"golang.org/x/time/rate"

"github.com/sei-protocol/sei-load/config"
"github.com/sei-protocol/sei-load/stats"
"github.com/sei-protocol/sei-load/types"
Expand All @@ -21,23 +23,25 @@ type ShardedSender struct {
mu sync.RWMutex
collector *stats.Collector
logger *stats.Logger
limiter *rate.Limiter // Shared rate limiter for all workers
}

// NewShardedSender creates a new sharded sender with workers for each endpoint
func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int) (*ShardedSender, error) {
func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int, limiter *rate.Limiter) (*ShardedSender, error) {
if len(cfg.Endpoints) == 0 {
return nil, fmt.Errorf("no endpoints configured")
}

workerList := make([]*Worker, len(cfg.Endpoints))
for i, endpoint := range cfg.Endpoints {
workerList[i] = NewWorker(i, endpoint, bufferSize, workers)
workerList[i] = NewWorker(i, endpoint, bufferSize, workers, limiter)
}

return &ShardedSender{
workers: workerList,
numShards: len(cfg.Endpoints),
bufferSize: bufferSize,
limiter: limiter,
}, nil
}

Expand Down
16 changes: 14 additions & 2 deletions sender/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"bytes"
"context"
"fmt"
"github.com/ethereum/go-ethereum/ethclient"
"io"
"log"
"net"
"net/http"
"time"

"github.com/ethereum/go-ethereum/ethclient"
"golang.org/x/time/rate"

"github.com/sei-protocol/sei-load/stats"
"github.com/sei-protocol/sei-load/types"
"github.com/sei-protocol/sei-load/utils"
Expand All @@ -29,6 +31,7 @@ type Worker struct {
logger *stats.Logger
workers int
trackReceipts bool
limiter *rate.Limiter // Shared rate limiter for transaction sending
}

func newHttpClient() *http.Client {
Expand All @@ -50,14 +53,15 @@ func newHttpClient() *http.Client {
}

// NewWorker creates a new worker for a specific endpoint
func NewWorker(id int, endpoint string, bufferSize int, workers int) *Worker {
func NewWorker(id int, endpoint string, bufferSize int, workers int, limiter *rate.Limiter) *Worker {
return &Worker{
id: id,
endpoint: endpoint,
txChan: make(chan *types.LoadTx, bufferSize),
sentTxs: make(chan *types.LoadTx, bufferSize),
workers: workers,
trackReceipts: false,
limiter: limiter,
}
}

Expand Down Expand Up @@ -152,6 +156,14 @@ func (w *Worker) processTransactions(ctx context.Context, client *http.Client) e
if err != nil {
return err
}

// Apply rate limiting before sending the transaction
if w.limiter != nil {
if err := w.limiter.Wait(ctx); err != nil {
return err
}
}

startTime := time.Now()
err = w.sendTransaction(ctx, client, tx)
// Record statistics if collector is available
Expand Down
Loading