Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions generator/weighted.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package generator

import (
"context"
"github.com/sei-protocol/sei-load/types"
"math/rand"
"sync"

"github.com/sei-protocol/sei-load/types"
)

// WeightedCfg is a configuration for a weighted scenarioGenerator.
Expand Down Expand Up @@ -32,7 +33,7 @@ func (w *weightedGenerator) GenerateInfinite(ctx context.Context) <-chan *types.
output := make(chan *types.LoadTx, 10000)
go func() {
defer close(output)
for {
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
Expand Down
7 changes: 4 additions & 3 deletions sender/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error {
logInterval := 100

// Run prewarm generator until completion
for {
for ctx.Err() == nil {
tx, ok := gen.Generate()
if !ok {
break // Prewarming is complete
Expand All @@ -87,7 +87,7 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error {

// Start begins the dispatcher's transaction generation and sending loop
func (d *Dispatcher) Run(ctx context.Context) error {
for {
for ctx.Err() == nil {
// Generate a transaction from main generator
tx, ok := d.generator.Generate()
if !ok {
Expand All @@ -103,6 +103,7 @@ func (d *Dispatcher) Run(ctx context.Context) error {
d.totalSent++
d.mu.Unlock()
}
return ctx.Err()
}

// StartBatch generates and sends a specific number of transactions then stops
Expand All @@ -126,7 +127,7 @@ func (d *Dispatcher) RunBatch(ctx context.Context, count int) error {
d.mu.Unlock()
}
}
return nil
return ctx.Err()
}

// GetStats returns dispatcher statistics
Expand Down
16 changes: 11 additions & 5 deletions sender/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package sender
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/ethclient"
"golang.org/x/time/rate"

Expand Down Expand Up @@ -111,7 +113,7 @@ func (w *Worker) watchTransactions(ctx context.Context) error {
if err != nil {
return fmt.Errorf("ethclient.Dial(%q): %w", w.endpoint, err)
}
for {
for ctx.Err() == nil {
tx, err := utils.Recv(ctx, w.sentTxs)
if err != nil {
return err
Expand All @@ -122,17 +124,19 @@ func (w *Worker) watchTransactions(ctx context.Context) error {
log.Printf("❌ %v", err)
}
}
return ctx.Err()
}

func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx *types.LoadTx) error {
ticker := time.NewTicker(100 * time.Millisecond)
for {
defer ticker.Stop()
for ctx.Err() == nil {
if _, err := utils.Recv(ctx, ticker.C); err != nil {
return fmt.Errorf("timeout waiting for receipt for tx %s", tx.EthTx.Hash().Hex())
}
receipt, err := eth.TransactionReceipt(context.Background(), tx.EthTx.Hash())
receipt, err := eth.TransactionReceipt(ctx, tx.EthTx.Hash())
if err != nil {
if err.Error() == "not found" {
if errors.Is(err, ethereum.NotFound) {
continue
}
log.Printf("❌ error getting receipt for tx %s: %v", tx.EthTx.Hash().Hex(), err)
Expand All @@ -147,11 +151,12 @@ func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx *
}
return nil
}
return ctx.Err()
}

// processTransactions is the main worker loop that processes transactions
func (w *Worker) processTransactions(ctx context.Context, client *http.Client) error {
for {
for ctx.Err() == nil {
tx, err := utils.Recv(ctx, w.txChan)
if err != nil {
return err
Expand All @@ -174,6 +179,7 @@ func (w *Worker) processTransactions(ctx context.Context, client *http.Client) e
log.Printf("%v", err)
}
}
return ctx.Err()
}

// sendTransaction sends a single transaction to the endpoint
Expand Down
3 changes: 2 additions & 1 deletion stats/block_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ func (bc *BlockCollector) Run(ctx context.Context, firstEndpoint string) error {
return subErr
})
log.Printf("📡 Subscribed to new blocks on %s", wsEndpoint)
for {
for ctx.Err() == nil {
header, err := utils.Recv(ctx, headers)
if err != nil {
return err
}
bc.processNewBlock(header)
}
return ctx.Err()
})
}

Expand Down
7 changes: 5 additions & 2 deletions stats/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package stats

import (
"context"
"github.com/sei-protocol/sei-load/utils"
"log"
"time"

"github.com/sei-protocol/sei-load/utils"
)

// Logger handles periodic statistics logging and dry-run transaction printing
Expand All @@ -26,12 +27,14 @@ func NewLogger(collector *Collector, interval time.Duration, debug bool) *Logger
// Start begins periodic statistics logging
func (l *Logger) Run(ctx context.Context) error {
ticker := time.NewTicker(l.interval)
for {
defer ticker.Stop()
for ctx.Err() == nil {
if _, err := utils.Recv(ctx, ticker.C); err != nil {
return err
}
l.logCurrentStats()
}
return ctx.Err()
}

// logCurrentStats logs the current statistics
Expand Down
5 changes: 3 additions & 2 deletions stats/user_latency_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ func NewUserLatencyTracker(interval time.Duration) *UserLatencyTracker {
func (ult *UserLatencyTracker) Run(ctx context.Context, endpoint string) error {
// Create ticker for the configured interval
ticker := time.NewTicker(ult.interval)

defer ticker.Stop()
// Connect to the endpoint
client, err := ethclient.Dial(endpoint)
if err != nil {
return err
}
defer client.Close()

for {
for ctx.Err() == nil {
if _, err := utils.Recv(ctx, ticker.C); err != nil {
return err
}
Expand All @@ -45,6 +45,7 @@ func (ult *UserLatencyTracker) Run(ctx context.Context, endpoint string) error {
// Continue on error - don't stop the tracker
}
}
return ctx.Err()
}

// trackLatency fetches the latest block and calculates user latency statistics
Expand Down
Loading