From 5f8bd1bf2b48cc55dcd1675e9461d03542c6bd1c Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 4 Aug 2025 09:37:10 -0400 Subject: [PATCH 1/4] add user latency tracking --- generator/scenarios/EVMTransfer.go | 3 +- generator/utils/utils.go | 2 + main.go | 46 ++++++++---- sender/dispatcher.go | 4 +- sender/worker.go | 24 ++++--- stats/logger.go | 6 +- stats/user_latency_tracker.go | 111 +++++++++++++++++++++++++++++ utils/mutex.go | 19 +++-- 8 files changed, 178 insertions(+), 37 deletions(-) create mode 100644 stats/user_latency_tracker.go diff --git a/generator/scenarios/EVMTransfer.go b/generator/scenarios/EVMTransfer.go index 441025b..7ffe593 100644 --- a/generator/scenarios/EVMTransfer.go +++ b/generator/scenarios/EVMTransfer.go @@ -2,6 +2,7 @@ package scenarios import ( "math/big" + "time" "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" @@ -49,7 +50,7 @@ func (s *EVMTransferScenario) CreateTransaction(config *config.LoadConfig, scena tx := ðtypes.DynamicFeeTx{ Nonce: scenario.Nonce, To: &scenario.Receiver, - Value: bigOne, + Value: big.NewInt(time.Now().Unix()), Gas: 21000, // Standard gas limit for ETH transfer GasTipCap: big.NewInt(2000000000), // 2 gwei GasFeeCap: big.NewInt(20000000000), // 20 gwei diff --git a/generator/utils/utils.go b/generator/utils/utils.go index 9d9eaab..5244699 100644 --- a/generator/utils/utils.go +++ b/generator/utils/utils.go @@ -2,6 +2,7 @@ package utils import ( "math/big" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -47,6 +48,7 @@ func createTransactOpts(chainID *big.Int, account *loadtypes.Account, gasLimit u // Set transaction parameters auth.Nonce = big.NewInt(int64(account.Nonce)) auth.NoSend = noSend + auth.Value = big.NewInt(time.Now().Unix()) auth.GasLimit = gasLimit auth.GasTipCap = big.NewInt(2000000000) // 2 gwei tip (priority fee) diff --git a/main.go b/main.go index 5442ddb..042c769 100644 --- a/main.go +++ b/main.go @@ -22,16 +22,17 @@ import ( ) var ( - configFile string - statsInterval time.Duration - bufferSize int - tps float64 - dryRun bool - debug bool - workers int - trackReceipts bool - trackBlocks bool - prewarm bool + configFile string + statsInterval time.Duration + bufferSize int + tps float64 + dryRun bool + debug bool + workers int + trackReceipts bool + trackBlocks bool + prewarm bool + trackUserLatency bool ) var rootCmd = &cobra.Command{ @@ -62,6 +63,7 @@ func init() { rootCmd.Flags().BoolVarP(&trackReceipts, "track-receipts", "", false, "Track receipts") rootCmd.Flags().BoolVarP(&trackBlocks, "track-blocks", "", false, "Track blocks") rootCmd.Flags().BoolVarP(&prewarm, "prewarm", "", false, "Prewarm accounts with self-transactions") + rootCmd.Flags().BoolVarP(&trackUserLatency, "track-user-latency", "", false, "Track user latency") rootCmd.Flags().IntVarP(&workers, "workers", "w", 1, "Number of workers") if err := rootCmd.MarkFlagRequired("config"); err != nil { @@ -83,7 +85,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { // Parse the config file into a config.LoadConfig struct cfg, err := loadConfig(configFile) if err != nil { - return fmt.Errorf("Failed to load config: %w", err) + return fmt.Errorf("failed to load config: %w", err) } log.Printf("🚀 Starting Sei Chain Load Test v2") @@ -109,6 +111,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { if prewarm { log.Printf("📝 Prewarm: enabled") } + if trackUserLatency { + log.Printf("📝 Track user latency: enabled") + } log.Println() // Enable mock deployment in dry-run mode @@ -124,13 +129,13 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { // Create the generator from the config struct gen, err := generator.NewConfigBasedGenerator(cfg) if err != nil { - return fmt.Errorf("Failed to create generator: %w", err) + return fmt.Errorf("failed to create generator: %w", err) } // Create the sender from the config struct snd, err := sender.NewShardedSender(cfg, bufferSize, workers) if err != nil { - return fmt.Errorf("Failed to create sender: %w", err) + return fmt.Errorf("failed to create sender: %w", err) } // Create and start block collector if endpoints are available @@ -143,6 +148,14 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { }) } + // Create and start user latency tracker if endpoints are available + if len(cfg.Endpoints) > 0 && trackUserLatency { + userLatencyTracker := stats.NewUserLatencyTracker(statsInterval) + s.SpawnBgNamed("user latency tracker", func() error { + return userLatencyTracker.Run(ctx, cfg.Endpoints[0]) + }) + } + // Enable dry-run mode in sender if specified if dryRun { snd.SetDryRun(true) @@ -187,7 +200,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { // Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions) if prewarm { if err := dispatcher.Prewarm(ctx); err != nil { - return fmt.Errorf("Failed to prewarm accounts: %w", err) + return fmt.Errorf("failed to prewarm accounts: %w", err) } } @@ -216,6 +229,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { if trackBlocks { log.Printf("📝 Track blocks mode: Block data will be collected") } + if trackUserLatency { + log.Printf("📝 Track user latency mode: User latency will be tracked") + } log.Print(strings.Repeat("=", 60)) // Main loop - wait for shutdown signal @@ -240,7 +256,7 @@ func loadConfig(filename string) (*config.LoadConfig, error) { var cfg config.LoadConfig if err := json.Unmarshal(data, &cfg); err != nil { - return nil, fmt.Errorf("failed to parse config JSON: %w", err) + return nil, fmt.Errorf("failed to parse config json: %w", err) } // Validate configuration diff --git a/sender/dispatcher.go b/sender/dispatcher.go index e62a884..a7cda6f 100644 --- a/sender/dispatcher.go +++ b/sender/dispatcher.go @@ -133,14 +133,14 @@ func (d *Dispatcher) RunBatch(ctx context.Context, count int) error { d.mu.RLock() limiter := d.limiter d.mu.RUnlock() - for i := range count { + for i := 0; i < count; i++ { if err := limiter.Wait(ctx); err != nil { return err } // Generate a transaction tx, ok := d.generator.Generate() if !ok { - return fmt.Errorf("Dispatcher: Generator returned nil transaction (batch %d/%d)\n", i+1, count) + return fmt.Errorf("dispatcher: generator returned nil transaction (batch %d/%d)", i+1, count) } // Send the transaction if err := d.sender.Send(ctx, tx); err != nil { diff --git a/sender/worker.go b/sender/worker.go index 6e06e36..c1ecaae 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -57,7 +57,7 @@ func NewWorker(id int, endpoint string, bufferSize int, workers int) *Worker { txChan: make(chan *types.LoadTx, bufferSize), sentTxs: make(chan *types.LoadTx, bufferSize), workers: workers, - trackReceipts: true, + trackReceipts: false, } } @@ -187,15 +187,19 @@ func (w *Worker) sendTransaction(ctx context.Context, client *http.Client, tx *t if err != nil { return fmt.Errorf("Worker %d: Failed to send transaction: %w", w.id, err) } - defer resp.Body.Close() - - // Always read and discard response body to enable connection reuse - // Limit read to prevent memory issues with large responses - _, err = io.CopyN(io.Discard, resp.Body, 64*1024) // Read up to 64KB - if err != nil && err != io.EOF { - log.Printf("Worker %d: Failed to read response body: %v", w.id, err) - // Log but don't fail - this is just for connection reuse - } + defer func() { + // Limit read to prevent memory issues with large responses + _, err = io.CopyN(io.Discard, resp.Body, 64*1024) // Read up to 64KB + if err != nil && err != io.EOF { + log.Printf("Worker %d: Failed to read response body: %v", w.id, err) + // Log but don't fail - this is just for connection reuse + } + + // Close response body and handle error + if closeErr := resp.Body.Close(); closeErr != nil { + log.Printf("Worker %d: Failed to close response body: %v", w.id, closeErr) + } + }() // Check response status if resp.StatusCode != http.StatusOK { diff --git a/stats/logger.go b/stats/logger.go index 5582c91..ff6cca9 100644 --- a/stats/logger.go +++ b/stats/logger.go @@ -111,8 +111,7 @@ func (l *Logger) logCurrentStats() { } // Print overall summary line - log.Printf("[%s] throughput tps=%.2f, txs=%d, latency(avg=%v p50=%v p99=%v max=%v)", - time.Now().Format("15:04:05"), + log.Printf("throughput tps=%.2f, txs=%d, latency(avg=%v p50=%v p99=%v max=%v)", totalWindowTPS, totalTxs, overallAvgLatency.Round(time.Millisecond), @@ -122,8 +121,7 @@ func (l *Logger) logCurrentStats() { // Print block statistics if available if stats.BlockStats != nil && stats.BlockStats.SampleCount > 0 { - log.Printf("[%s] %s", - time.Now().Format("15:04:05"), + log.Printf("%s", stats.BlockStats.FormatBlockStats()) } diff --git a/stats/user_latency_tracker.go b/stats/user_latency_tracker.go new file mode 100644 index 0000000..af916f3 --- /dev/null +++ b/stats/user_latency_tracker.go @@ -0,0 +1,111 @@ +package stats + +import ( + "context" + "log" + "math/big" + "sort" + "time" + + "github.com/ethereum/go-ethereum/ethclient" +) + +// UserLatencyTracker tracks user latency by analyzing block transactions +type UserLatencyTracker struct { + endpoint string + interval time.Duration +} + +// NewUserLatencyTracker creates a new user latency tracker +func NewUserLatencyTracker(interval time.Duration) *UserLatencyTracker { + return &UserLatencyTracker{ + interval: interval, + } +} + +// Run starts the user latency tracking loop +func (ult *UserLatencyTracker) Run(ctx context.Context, endpoint string) error { + ult.endpoint = endpoint + + // Create ticker for the configured interval + ticker := time.NewTicker(ult.interval) + defer ticker.Stop() + + // Connect to the endpoint + client, err := ethclient.Dial(endpoint) + if err != nil { + return err + } + defer client.Close() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if err := ult.trackLatency(ctx, client); err != nil { + log.Printf("User latency tracker: Error tracking latency: %v", err) + // Continue on error - don't stop the tracker + } + } + } +} + +// trackLatency fetches the latest block and calculates user latency statistics +func (ult *UserLatencyTracker) trackLatency(ctx context.Context, client *ethclient.Client) error { + // Get the latest block with transactions + block, err := client.BlockByNumber(ctx, nil) + if err != nil { + return err + } + + // Skip if no transactions + txs := block.Transactions() + if len(txs) == 0 { + log.Printf("User latency tracker: Block %d has no transactions", block.NumberU64()) + return nil + } + + // Calculate latencies for each transaction + var latencies []time.Duration + blockTimestamp := time.Unix(int64(block.Time()), 0) + + for i, tx := range txs { + // Extract timestamp from transaction value (set to time.Now().Unix() during creation) + if tx.Value() != nil && tx.Value().Cmp(big.NewInt(0)) > 0 { + txTimestamp := time.Unix(tx.Value().Int64(), 0) + latency := blockTimestamp.Sub(txTimestamp) + + // Only include positive latencies (sanity check) + if latency >= 0 { + latencies = append(latencies, latency) + } else { + log.Printf("User latency tracker: Negative latency detected: %v", latency) + } + } else { + log.Printf("User latency tracker: TX %d has nil or zero value", i) + } + } + + // Skip logging if no valid latencies + if len(latencies) == 0 { + log.Printf("User latency tracker: No valid latencies found in block %d", block.NumberU64()) + return nil + } + + // Calculate statistics + sort.Slice(latencies, func(i, j int) bool { + return latencies[i] < latencies[j] + }) + + minLatency := latencies[0] + maxLatency := latencies[len(latencies)-1] + p50 := latencies[len(latencies)/2] + + // Log the summary + log.Printf("user latency height=%d txs=%d min=%v p50=%v max=%v", + block.NumberU64(), len(latencies), + minLatency, p50, maxLatency) + + return nil +} diff --git a/utils/mutex.go b/utils/mutex.go index 88378b4..9599c9a 100644 --- a/utils/mutex.go +++ b/utils/mutex.go @@ -49,7 +49,7 @@ type atomicWatch[T any] struct { } type AtomicSend[T any] struct { - atomicWatch[T] + *atomicWatch[T] } // Store updates the value of the atomic watch. @@ -67,13 +67,14 @@ func (w *AtomicSend[T]) Update(f func(T) (T, bool)) { } func NewAtomicSend[T any](value T) (w AtomicSend[T]) { - w.atomicWatch.ptr.Store(newVersion(value)) + w.atomicWatch = &atomicWatch[T]{} + w.ptr.Store(newVersion(value)) // nolint:nakedret return } func (w *AtomicSend[T]) Subscribe() AtomicRecv[T] { - return AtomicRecv[T]{&w.atomicWatch} + return AtomicRecv[T]{w.atomicWatch} } // AtomicWatch stores a pointer to an IMMUTABLE value. @@ -81,7 +82,7 @@ func (w *AtomicSend[T]) Subscribe() AtomicRecv[T] { // TODO(gprusak): remove mutex and rename to AtomicSend, // this will allow for sharing a mutex across multiple AtomicSenders. type AtomicWatch[T any] struct { - atomicWatch[T] + *atomicWatch[T] mu sync.Mutex } @@ -97,7 +98,7 @@ func NewAtomicWatch[T any](value T) (w AtomicWatch[T]) { // Subscribe returns a view-only API of the atomic watch. func (w *AtomicWatch[T]) Subscribe() AtomicRecv[T] { - return AtomicRecv[T]{&w.atomicWatch} + return AtomicRecv[T]{w.atomicWatch} } // Load returns the current value of the atomic watch. @@ -232,3 +233,11 @@ func (w *Watch[T]) Lock() iter.Seq2[T, *WatchCtrl] { _ = yield(w.val, &w.ctrl) } } + +// Set updates the value and notifies all watchers +func (w *Mutex[T]) Set(value T) { + newVersion := newVersion(value) + w.mu.Lock() + defer w.mu.Unlock() + w.value = newVersion.value +} From fd6164c03c47d97601c180c5468626fd67d8d6f0 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 4 Aug 2025 09:48:48 -0400 Subject: [PATCH 2/4] clean up changes for lint --- utils/mutex.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/utils/mutex.go b/utils/mutex.go index 9599c9a..beb1154 100644 --- a/utils/mutex.go +++ b/utils/mutex.go @@ -49,7 +49,7 @@ type atomicWatch[T any] struct { } type AtomicSend[T any] struct { - *atomicWatch[T] + atomicWatch[T] } // Store updates the value of the atomic watch. @@ -67,14 +67,13 @@ func (w *AtomicSend[T]) Update(f func(T) (T, bool)) { } func NewAtomicSend[T any](value T) (w AtomicSend[T]) { - w.atomicWatch = &atomicWatch[T]{} w.ptr.Store(newVersion(value)) // nolint:nakedret return } func (w *AtomicSend[T]) Subscribe() AtomicRecv[T] { - return AtomicRecv[T]{w.atomicWatch} + return AtomicRecv[T]{&w.atomicWatch} } // AtomicWatch stores a pointer to an IMMUTABLE value. @@ -82,7 +81,7 @@ func (w *AtomicSend[T]) Subscribe() AtomicRecv[T] { // TODO(gprusak): remove mutex and rename to AtomicSend, // this will allow for sharing a mutex across multiple AtomicSenders. type AtomicWatch[T any] struct { - *atomicWatch[T] + atomicWatch[T] mu sync.Mutex } @@ -98,7 +97,7 @@ func NewAtomicWatch[T any](value T) (w AtomicWatch[T]) { // Subscribe returns a view-only API of the atomic watch. func (w *AtomicWatch[T]) Subscribe() AtomicRecv[T] { - return AtomicRecv[T]{w.atomicWatch} + return AtomicRecv[T]{&w.atomicWatch} } // Load returns the current value of the atomic watch. @@ -233,11 +232,3 @@ func (w *Watch[T]) Lock() iter.Seq2[T, *WatchCtrl] { _ = yield(w.val, &w.ctrl) } } - -// Set updates the value and notifies all watchers -func (w *Mutex[T]) Set(value T) { - newVersion := newVersion(value) - w.mu.Lock() - defer w.mu.Unlock() - w.value = newVersion.value -} From a9d5e6672cc24a3d6808473c34bbd8957e4f93ce Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 4 Aug 2025 10:23:45 -0400 Subject: [PATCH 3/4] cleanup --- sender/dispatcher.go | 2 +- stats/user_latency_tracker.go | 15 +++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/sender/dispatcher.go b/sender/dispatcher.go index a7cda6f..f26edd3 100644 --- a/sender/dispatcher.go +++ b/sender/dispatcher.go @@ -133,7 +133,7 @@ func (d *Dispatcher) RunBatch(ctx context.Context, count int) error { d.mu.RLock() limiter := d.limiter d.mu.RUnlock() - for i := 0; i < count; i++ { + for i := range count { if err := limiter.Wait(ctx); err != nil { return err } diff --git a/stats/user_latency_tracker.go b/stats/user_latency_tracker.go index af916f3..5f5f242 100644 --- a/stats/user_latency_tracker.go +++ b/stats/user_latency_tracker.go @@ -2,6 +2,7 @@ package stats import ( "context" + "github.com/sei-protocol/sei-load/utils" "log" "math/big" "sort" @@ -39,14 +40,12 @@ func (ult *UserLatencyTracker) Run(ctx context.Context, endpoint string) error { defer client.Close() for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-ticker.C: - if err := ult.trackLatency(ctx, client); err != nil { - log.Printf("User latency tracker: Error tracking latency: %v", err) - // Continue on error - don't stop the tracker - } + if _, err := utils.Recv(ctx, ticker.C); err != nil { + return err + } + if err := ult.trackLatency(ctx, client); err != nil { + log.Printf("User latency tracker: Error tracking latency: %v", err) + // Continue on error - don't stop the tracker } } } From 3d572e169eb9c1860f15794f1b62cd712032cf2b Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 4 Aug 2025 10:25:58 -0400 Subject: [PATCH 4/4] fix unused endpoint --- stats/user_latency_tracker.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/stats/user_latency_tracker.go b/stats/user_latency_tracker.go index 5f5f242..d6525de 100644 --- a/stats/user_latency_tracker.go +++ b/stats/user_latency_tracker.go @@ -2,18 +2,18 @@ package stats import ( "context" - "github.com/sei-protocol/sei-load/utils" "log" "math/big" "sort" "time" "github.com/ethereum/go-ethereum/ethclient" + + "github.com/sei-protocol/sei-load/utils" ) // UserLatencyTracker tracks user latency by analyzing block transactions type UserLatencyTracker struct { - endpoint string interval time.Duration } @@ -26,11 +26,8 @@ func NewUserLatencyTracker(interval time.Duration) *UserLatencyTracker { // Run starts the user latency tracking loop func (ult *UserLatencyTracker) Run(ctx context.Context, endpoint string) error { - ult.endpoint = endpoint - // Create ticker for the configured interval ticker := time.NewTicker(ult.interval) - defer ticker.Stop() // Connect to the endpoint client, err := ethclient.Dial(endpoint)