From 76b028dfd48a8d87db4498a3e6f4d7a9e2d3ca16 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Wed, 6 Aug 2025 09:34:52 -0400 Subject: [PATCH] add global limiter --- main.go | 34 ++++++++++++++-------------------- sender/dispatcher.go | 26 -------------------------- sender/sharded_sender.go | 8 ++++++-- sender/worker.go | 16 ++++++++++++++-- 4 files changed, 34 insertions(+), 50 deletions(-) diff --git a/main.go b/main.go index fadf822..10deb49 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "time" "github.com/spf13/cobra" + "golang.org/x/time/rate" "github.com/sei-protocol/sei-load/config" "github.com/sei-protocol/sei-load/generator" @@ -25,20 +26,6 @@ var ( configFile string ) -// ResolvedSettings holds the final resolved settings after applying precedence -type ResolvedSettings struct { - Workers int - TPS float64 - StatsInterval time.Duration - BufferSize int - DryRun bool - Debug bool - TrackReceipts bool - TrackBlocks bool - TrackUserLatency bool - Prewarm bool -} - var rootCmd = &cobra.Command{ Use: "seiload", Short: "Sei Chain Load Test v2", @@ -149,8 +136,20 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to create generator: %w", err) } + // Create shared rate limiter for all workers if TPS is specified + var sharedLimiter *rate.Limiter + if settings.TPS > 0 { + // Convert TPS to interval: 1/tps seconds = (1/tps) * 1e9 nanoseconds + intervalNs := int64((1.0 / settings.TPS) * 1e9) + sharedLimiter = rate.NewLimiter(rate.Every(time.Duration(intervalNs)), 1) + log.Printf("📈 Rate limiting enabled: %.2f TPS shared across all workers", settings.TPS) + } else { + // No rate limiting + sharedLimiter = rate.NewLimiter(rate.Inf, 1) + } + // Create the sender from the config struct - snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers) + snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers, sharedLimiter) if err != nil { return fmt.Errorf("failed to create sender: %w", err) } @@ -192,11 +191,6 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { // Create dispatcher dispatcher := sender.NewDispatcher(gen, snd) - if settings.TPS > 0 { - // Convert TPS to interval: 1/tps seconds = (1/tps) * 1e9 nanoseconds - intervalNs := int64((1.0 / settings.TPS) * 1e9) - dispatcher.SetRateLimit(time.Duration(intervalNs)) - } // Set statistics collector for dispatcher dispatcher.SetStatsCollector(collector) diff --git a/sender/dispatcher.go b/sender/dispatcher.go index f26edd3..814072e 100644 --- a/sender/dispatcher.go +++ b/sender/dispatcher.go @@ -3,10 +3,8 @@ package sender import ( "context" "fmt" - "golang.org/x/time/rate" "log" "sync" - "time" "github.com/sei-protocol/sei-load/generator" "github.com/sei-protocol/sei-load/stats" @@ -19,9 +17,6 @@ type Dispatcher struct { prewarmGen utils.Option[generator.Generator] // Optional prewarm generator sender TxSender - // Configuration - limiter *rate.Limiter - // Statistics totalSent uint64 mu sync.RWMutex @@ -33,17 +28,9 @@ func NewDispatcher(gen generator.Generator, sender TxSender) *Dispatcher { return &Dispatcher{ generator: gen, sender: sender, - limiter: rate.NewLimiter(rate.Inf, 1), // No rate limiting by default } } -// SetRateLimit sets the minimum time between transaction generations -func (d *Dispatcher) SetRateLimit(duration time.Duration) { - d.mu.Lock() - defer d.mu.Unlock() - d.limiter = rate.NewLimiter(rate.Every(duration), 1) -} - // SetStatsCollector sets the statistics collector for this dispatcher func (d *Dispatcher) SetStatsCollector(collector *stats.Collector) { d.mu.Lock() @@ -100,14 +87,7 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error { // Start begins the dispatcher's transaction generation and sending loop func (d *Dispatcher) Run(ctx context.Context) error { - d.mu.RLock() - limiter := d.limiter - d.mu.RUnlock() - for { - if err := limiter.Wait(ctx); err != nil { - return err - } // Generate a transaction from main generator tx, ok := d.generator.Generate() if !ok { @@ -130,13 +110,7 @@ func (d *Dispatcher) RunBatch(ctx context.Context, count int) error { if count <= 0 { return fmt.Errorf("count must be positive") } - d.mu.RLock() - limiter := d.limiter - d.mu.RUnlock() for i := range count { - if err := limiter.Wait(ctx); err != nil { - return err - } // Generate a transaction tx, ok := d.generator.Generate() if !ok { diff --git a/sender/sharded_sender.go b/sender/sharded_sender.go index 86085d1..84f42c1 100644 --- a/sender/sharded_sender.go +++ b/sender/sharded_sender.go @@ -5,6 +5,8 @@ import ( "fmt" "sync" + "golang.org/x/time/rate" + "github.com/sei-protocol/sei-load/config" "github.com/sei-protocol/sei-load/stats" "github.com/sei-protocol/sei-load/types" @@ -21,23 +23,25 @@ type ShardedSender struct { mu sync.RWMutex collector *stats.Collector logger *stats.Logger + limiter *rate.Limiter // Shared rate limiter for all workers } // NewShardedSender creates a new sharded sender with workers for each endpoint -func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int) (*ShardedSender, error) { +func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int, limiter *rate.Limiter) (*ShardedSender, error) { if len(cfg.Endpoints) == 0 { return nil, fmt.Errorf("no endpoints configured") } workerList := make([]*Worker, len(cfg.Endpoints)) for i, endpoint := range cfg.Endpoints { - workerList[i] = NewWorker(i, endpoint, bufferSize, workers) + workerList[i] = NewWorker(i, endpoint, bufferSize, workers, limiter) } return &ShardedSender{ workers: workerList, numShards: len(cfg.Endpoints), bufferSize: bufferSize, + limiter: limiter, }, nil } diff --git a/sender/worker.go b/sender/worker.go index c1ecaae..0c3d6a2 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -4,13 +4,15 @@ import ( "bytes" "context" "fmt" - "github.com/ethereum/go-ethereum/ethclient" "io" "log" "net" "net/http" "time" + "github.com/ethereum/go-ethereum/ethclient" + "golang.org/x/time/rate" + "github.com/sei-protocol/sei-load/stats" "github.com/sei-protocol/sei-load/types" "github.com/sei-protocol/sei-load/utils" @@ -29,6 +31,7 @@ type Worker struct { logger *stats.Logger workers int trackReceipts bool + limiter *rate.Limiter // Shared rate limiter for transaction sending } func newHttpClient() *http.Client { @@ -50,7 +53,7 @@ func newHttpClient() *http.Client { } // NewWorker creates a new worker for a specific endpoint -func NewWorker(id int, endpoint string, bufferSize int, workers int) *Worker { +func NewWorker(id int, endpoint string, bufferSize int, workers int, limiter *rate.Limiter) *Worker { return &Worker{ id: id, endpoint: endpoint, @@ -58,6 +61,7 @@ func NewWorker(id int, endpoint string, bufferSize int, workers int) *Worker { sentTxs: make(chan *types.LoadTx, bufferSize), workers: workers, trackReceipts: false, + limiter: limiter, } } @@ -152,6 +156,14 @@ func (w *Worker) processTransactions(ctx context.Context, client *http.Client) e if err != nil { return err } + + // Apply rate limiting before sending the transaction + if w.limiter != nil { + if err := w.limiter.Wait(ctx); err != nil { + return err + } + } + startTime := time.Now() err = w.sendTransaction(ctx, client, tx) // Record statistics if collector is available