Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion generator/scenarios/EVMTransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scenarios

import (
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -49,7 +50,7 @@ func (s *EVMTransferScenario) CreateTransaction(config *config.LoadConfig, scena
tx := &ethtypes.DynamicFeeTx{
Nonce: scenario.Nonce,
To: &scenario.Receiver,
Value: bigOne,
Value: big.NewInt(time.Now().Unix()),
Gas: 21000, // Standard gas limit for ETH transfer
GasTipCap: big.NewInt(2000000000), // 2 gwei
GasFeeCap: big.NewInt(20000000000), // 20 gwei
Expand Down
2 changes: 2 additions & 0 deletions generator/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package utils

import (
"math/big"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -47,6 +48,7 @@ func createTransactOpts(chainID *big.Int, account *loadtypes.Account, gasLimit u
// Set transaction parameters
auth.Nonce = big.NewInt(int64(account.Nonce))
auth.NoSend = noSend
auth.Value = big.NewInt(time.Now().Unix())

auth.GasLimit = gasLimit
auth.GasTipCap = big.NewInt(2000000000) // 2 gwei tip (priority fee)
Expand Down
46 changes: 31 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ import (
)

var (
configFile string
statsInterval time.Duration
bufferSize int
tps float64
dryRun bool
debug bool
workers int
trackReceipts bool
trackBlocks bool
prewarm bool
configFile string
statsInterval time.Duration
bufferSize int
tps float64
dryRun bool
debug bool
workers int
trackReceipts bool
trackBlocks bool
prewarm bool
trackUserLatency bool
)

var rootCmd = &cobra.Command{
Expand Down Expand Up @@ -62,6 +63,7 @@ func init() {
rootCmd.Flags().BoolVarP(&trackReceipts, "track-receipts", "", false, "Track receipts")
rootCmd.Flags().BoolVarP(&trackBlocks, "track-blocks", "", false, "Track blocks")
rootCmd.Flags().BoolVarP(&prewarm, "prewarm", "", false, "Prewarm accounts with self-transactions")
rootCmd.Flags().BoolVarP(&trackUserLatency, "track-user-latency", "", false, "Track user latency")
rootCmd.Flags().IntVarP(&workers, "workers", "w", 1, "Number of workers")

if err := rootCmd.MarkFlagRequired("config"); err != nil {
Expand All @@ -83,7 +85,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
// Parse the config file into a config.LoadConfig struct
cfg, err := loadConfig(configFile)
if err != nil {
return fmt.Errorf("Failed to load config: %w", err)
return fmt.Errorf("failed to load config: %w", err)
}

log.Printf("🚀 Starting Sei Chain Load Test v2")
Expand All @@ -109,6 +111,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
if prewarm {
log.Printf("📝 Prewarm: enabled")
}
if trackUserLatency {
log.Printf("📝 Track user latency: enabled")
}
log.Println()

// Enable mock deployment in dry-run mode
Expand All @@ -124,13 +129,13 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
// Create the generator from the config struct
gen, err := generator.NewConfigBasedGenerator(cfg)
if err != nil {
return fmt.Errorf("Failed to create generator: %w", err)
return fmt.Errorf("failed to create generator: %w", err)
}

// Create the sender from the config struct
snd, err := sender.NewShardedSender(cfg, bufferSize, workers)
if err != nil {
return fmt.Errorf("Failed to create sender: %w", err)
return fmt.Errorf("failed to create sender: %w", err)
}

// Create and start block collector if endpoints are available
Expand All @@ -143,6 +148,14 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
})
}

// Create and start user latency tracker if endpoints are available
if len(cfg.Endpoints) > 0 && trackUserLatency {
userLatencyTracker := stats.NewUserLatencyTracker(statsInterval)
s.SpawnBgNamed("user latency tracker", func() error {
return userLatencyTracker.Run(ctx, cfg.Endpoints[0])
})
}

// Enable dry-run mode in sender if specified
if dryRun {
snd.SetDryRun(true)
Expand Down Expand Up @@ -187,7 +200,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
// Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions)
if prewarm {
if err := dispatcher.Prewarm(ctx); err != nil {
return fmt.Errorf("Failed to prewarm accounts: %w", err)
return fmt.Errorf("failed to prewarm accounts: %w", err)
}
}

Expand Down Expand Up @@ -216,6 +229,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
if trackBlocks {
log.Printf("📝 Track blocks mode: Block data will be collected")
}
if trackUserLatency {
log.Printf("📝 Track user latency mode: User latency will be tracked")
}
log.Print(strings.Repeat("=", 60))

// Main loop - wait for shutdown signal
Expand All @@ -240,7 +256,7 @@ func loadConfig(filename string) (*config.LoadConfig, error) {

var cfg config.LoadConfig
if err := json.Unmarshal(data, &cfg); err != nil {
return nil, fmt.Errorf("failed to parse config JSON: %w", err)
return nil, fmt.Errorf("failed to parse config json: %w", err)
}

// Validate configuration
Expand Down
2 changes: 1 addition & 1 deletion sender/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (d *Dispatcher) RunBatch(ctx context.Context, count int) error {
// Generate a transaction
tx, ok := d.generator.Generate()
if !ok {
return fmt.Errorf("Dispatcher: Generator returned nil transaction (batch %d/%d)\n", i+1, count)
return fmt.Errorf("dispatcher: generator returned nil transaction (batch %d/%d)", i+1, count)
}
// Send the transaction
if err := d.sender.Send(ctx, tx); err != nil {
Expand Down
24 changes: 14 additions & 10 deletions sender/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewWorker(id int, endpoint string, bufferSize int, workers int) *Worker {
txChan: make(chan *types.LoadTx, bufferSize),
sentTxs: make(chan *types.LoadTx, bufferSize),
workers: workers,
trackReceipts: true,
trackReceipts: false,
}
}

Expand Down Expand Up @@ -187,15 +187,19 @@ func (w *Worker) sendTransaction(ctx context.Context, client *http.Client, tx *t
if err != nil {
return fmt.Errorf("Worker %d: Failed to send transaction: %w", w.id, err)
}
defer resp.Body.Close()

// Always read and discard response body to enable connection reuse
// Limit read to prevent memory issues with large responses
_, err = io.CopyN(io.Discard, resp.Body, 64*1024) // Read up to 64KB
if err != nil && err != io.EOF {
log.Printf("Worker %d: Failed to read response body: %v", w.id, err)
// Log but don't fail - this is just for connection reuse
}
defer func() {
// Limit read to prevent memory issues with large responses
_, err = io.CopyN(io.Discard, resp.Body, 64*1024) // Read up to 64KB
if err != nil && err != io.EOF {
log.Printf("Worker %d: Failed to read response body: %v", w.id, err)
// Log but don't fail - this is just for connection reuse
}

// Close response body and handle error
if closeErr := resp.Body.Close(); closeErr != nil {
log.Printf("Worker %d: Failed to close response body: %v", w.id, closeErr)
}
}()

// Check response status
if resp.StatusCode != http.StatusOK {
Expand Down
6 changes: 2 additions & 4 deletions stats/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ func (l *Logger) logCurrentStats() {
}

// Print overall summary line
log.Printf("[%s] throughput tps=%.2f, txs=%d, latency(avg=%v p50=%v p99=%v max=%v)",
time.Now().Format("15:04:05"),
log.Printf("throughput tps=%.2f, txs=%d, latency(avg=%v p50=%v p99=%v max=%v)",
totalWindowTPS,
totalTxs,
overallAvgLatency.Round(time.Millisecond),
Expand All @@ -122,8 +121,7 @@ func (l *Logger) logCurrentStats() {

// Print block statistics if available
if stats.BlockStats != nil && stats.BlockStats.SampleCount > 0 {
log.Printf("[%s] %s",
time.Now().Format("15:04:05"),
log.Printf("%s",
stats.BlockStats.FormatBlockStats())
}

Expand Down
107 changes: 107 additions & 0 deletions stats/user_latency_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package stats

import (
"context"
"log"
"math/big"
"sort"
"time"

"github.com/ethereum/go-ethereum/ethclient"

"github.com/sei-protocol/sei-load/utils"
)

// UserLatencyTracker tracks user latency by analyzing block transactions
type UserLatencyTracker struct {
interval time.Duration
}

// NewUserLatencyTracker creates a new user latency tracker
func NewUserLatencyTracker(interval time.Duration) *UserLatencyTracker {
return &UserLatencyTracker{
interval: interval,
}
}

// Run starts the user latency tracking loop
func (ult *UserLatencyTracker) Run(ctx context.Context, endpoint string) error {
// Create ticker for the configured interval
ticker := time.NewTicker(ult.interval)

// Connect to the endpoint
client, err := ethclient.Dial(endpoint)
if err != nil {
return err
}
defer client.Close()

for {
if _, err := utils.Recv(ctx, ticker.C); err != nil {
return err
}
if err := ult.trackLatency(ctx, client); err != nil {
log.Printf("User latency tracker: Error tracking latency: %v", err)
// Continue on error - don't stop the tracker
}
}
}

// trackLatency fetches the latest block and calculates user latency statistics
func (ult *UserLatencyTracker) trackLatency(ctx context.Context, client *ethclient.Client) error {
// Get the latest block with transactions
block, err := client.BlockByNumber(ctx, nil)
if err != nil {
return err
}

// Skip if no transactions
txs := block.Transactions()
if len(txs) == 0 {
log.Printf("User latency tracker: Block %d has no transactions", block.NumberU64())
return nil
}

// Calculate latencies for each transaction
var latencies []time.Duration
blockTimestamp := time.Unix(int64(block.Time()), 0)

for i, tx := range txs {
// Extract timestamp from transaction value (set to time.Now().Unix() during creation)
if tx.Value() != nil && tx.Value().Cmp(big.NewInt(0)) > 0 {
txTimestamp := time.Unix(tx.Value().Int64(), 0)
latency := blockTimestamp.Sub(txTimestamp)

// Only include positive latencies (sanity check)
if latency >= 0 {
latencies = append(latencies, latency)
} else {
log.Printf("User latency tracker: Negative latency detected: %v", latency)
}
} else {
log.Printf("User latency tracker: TX %d has nil or zero value", i)
}
}

// Skip logging if no valid latencies
if len(latencies) == 0 {
log.Printf("User latency tracker: No valid latencies found in block %d", block.NumberU64())
return nil
}

// Calculate statistics
sort.Slice(latencies, func(i, j int) bool {
return latencies[i] < latencies[j]
})

minLatency := latencies[0]
maxLatency := latencies[len(latencies)-1]
p50 := latencies[len(latencies)/2]

// Log the summary
log.Printf("user latency height=%d txs=%d min=%v p50=%v max=%v",
block.NumberU64(), len(latencies),
minLatency, p50, maxLatency)

return nil
}
2 changes: 1 addition & 1 deletion utils/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (w *AtomicSend[T]) Update(f func(T) (T, bool)) {
}

func NewAtomicSend[T any](value T) (w AtomicSend[T]) {
w.atomicWatch.ptr.Store(newVersion(value))
w.ptr.Store(newVersion(value))
// nolint:nakedret
return
}
Expand Down
Loading