From da87c1cd3e67ea00bd4d33e7eb7c5656ba9033f8 Mon Sep 17 00:00:00 2001 From: PhilHenning Date: Mon, 3 Nov 2025 12:01:15 -0700 Subject: [PATCH 1/9] Adding sleep timer to avoid overwhelming cluster with requests --- cmd/runner/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/runner/main.go b/cmd/runner/main.go index 4dfe72e..d5b33b8 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -206,6 +206,8 @@ func main() { return } + time.Sleep(500 * time.Millisecond) // slight delay to avoid overwhelming the server + if waitForCompletion { err = wf.Get(context.Background(), nil) if err != nil { From 432e25bc5208c06663b78d81dcddcbf620ca6c99 Mon Sep 17 00:00:00 2001 From: PhilHenning Date: Mon, 3 Nov 2025 12:26:50 -0700 Subject: [PATCH 2/9] Moving sleep timer into the error --- cmd/runner/main.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/runner/main.go b/cmd/runner/main.go index d5b33b8..b3b0261 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -203,11 +203,10 @@ func main() { wf, err := starter() if err != nil { log.Println("Unable to start workflow", err) + time.Sleep(500 * time.Millisecond) // slight delay to avoid overwhelming the server return } - time.Sleep(500 * time.Millisecond) // slight delay to avoid overwhelming the server - if waitForCompletion { err = wf.Get(context.Background(), nil) if err != nil { From 8cbfdae06bae3b5b7229abff9a3f58d9935ea5b9 Mon Sep 17 00:00:00 2001 From: PhilHenning Date: Mon, 3 Nov 2025 13:14:08 -0700 Subject: [PATCH 3/9] Adding regex check for StandbyCluster error --- cmd/runner/main.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/cmd/runner/main.go b/cmd/runner/main.go index b3b0261..b8f0242 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -19,15 +19,19 @@ import ( "go.uber.org/automaxprocs/maxprocs" "go.temporal.io/sdk/client" + "regexp" ) -var nWorkflows = flag.Int("c", 10, "concurrent workflows") -var sWorkflow = flag.String("t", "", "workflow type") -var sSignalType = flag.String("s", "", "signal type") -var bWait = flag.Bool("w", true, "wait for workflows to complete") -var sNamespace = flag.String("n", "default", "namespace") -var sTaskQueue = flag.String("tq", "benchmark", "task queue") +var ( + nWorkflows = flag.Int("c", 10, "concurrent workflows") + sWorkflow = flag.String("t", "", "workflow type") + sSignalType = flag.String("s", "", "signal type") + bWait = flag.Bool("w", true, "wait for workflows to complete") + sNamespace = flag.String("n", "default", "namespace") + sTaskQueue = flag.String("tq", "benchmark", "task queue") + errorCheckIfErrorIsForStandbyError = regexp.MustCompile(`Namespace: .+ is active in cluster: .+, while current cluster .+ is a standby cluster.`) +) // Track which flags were explicitly set var flagsSet = make(map[string]bool) @@ -203,7 +207,9 @@ func main() { wf, err := starter() if err != nil { log.Println("Unable to start workflow", err) - time.Sleep(500 * time.Millisecond) // slight delay to avoid overwhelming the server + if errorCheckIfErrorIsForStandbyError.MatchString(err.Error()) { + time.Sleep(1 * time.Second) // slight delay to avoid overwhelming the server + } return } From 7a82db2d7546340852986c71b12649e142f0dcdf Mon Sep 17 00:00:00 2001 From: PhilHenning Date: Mon, 3 Nov 2025 13:50:10 -0700 Subject: [PATCH 4/9] Adding off exponetial backoff --- cmd/runner/main.go | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/cmd/runner/main.go b/cmd/runner/main.go index b8f0242..be40d01 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -19,7 +19,6 @@ import ( "go.uber.org/automaxprocs/maxprocs" "go.temporal.io/sdk/client" - "regexp" ) var ( @@ -29,9 +28,12 @@ var ( bWait = flag.Bool("w", true, "wait for workflows to complete") sNamespace = flag.String("n", "default", "namespace") sTaskQueue = flag.String("tq", "benchmark", "task queue") - - errorCheckIfErrorIsForStandbyError = regexp.MustCompile(`Namespace: .+ is active in cluster: .+, while current cluster .+ is a standby cluster.`) + nmaxInterval = flag.Int("max-interval", 60, "maximum interval (in seconds) for exponential backoff") + nfactor = flag.Int("backoff-factor", 2, "factor for exponential backoff") + bBackoff = flag.Bool("disable-backoff", false, "disable exponential backoff on errors") + ) + // Track which flags were explicitly set var flagsSet = make(map[string]bool) @@ -101,6 +103,9 @@ func main() { waitForCompletion := getBoolValue("w", "TEMPORAL_WAIT", *bWait, true) namespace := getStringValue("n", "TEMPORAL_NAMESPACE", *sNamespace, "default") taskQueue := getStringValue("tq", "TEMPORAL_TASK_QUEUE", *sTaskQueue, "benchmark") + backOff := getBoolValue("disable-backoff", "TEMPORAL_DISABLE_ERROR_BACKOFF", *bBackoff, false) + maxInterval := getIntValue("max-interval", "TEMPORAL_BACKOFF_MAX_INTERVAL", *nmaxInterval, 60) + factor := getIntValue("backoff-factor", "TEMPORAL_BACKOFF_FACTOR", *nfactor, 2) log.Printf("Using namespace: %s", namespace) @@ -202,14 +207,14 @@ func main() { } go (func() { + var errorOccurred bool=false + currentInterval := 1 for { pool.Submit(func() { wf, err := starter() if err != nil { log.Println("Unable to start workflow", err) - if errorCheckIfErrorIsForStandbyError.MatchString(err.Error()) { - time.Sleep(1 * time.Second) // slight delay to avoid overwhelming the server - } + errorOccurred = true return } @@ -221,6 +226,18 @@ func main() { } } }) + if errorOccurred && !backOff{ + currentInterval *= factor + + if currentInterval > maxInterval { + log.Println("Unable to start workflow after retries", err) + os.Exit(1) + } + + log.Printf("Waiting for %d seconds before retrying to start workflow...", currentInterval) + time.Sleep(time.Duration(currentInterval) * time.Second) + errorOccurred = false + } } })() From b1bf44ea980e16b3a46fd095b54aa115a9d45df0 Mon Sep 17 00:00:00 2001 From: PhilHenning Date: Mon, 3 Nov 2025 13:52:30 -0700 Subject: [PATCH 5/9] Adding ability to skip maxIteration --- cmd/runner/main.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/runner/main.go b/cmd/runner/main.go index be40d01..a35f254 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -31,7 +31,6 @@ var ( nmaxInterval = flag.Int("max-interval", 60, "maximum interval (in seconds) for exponential backoff") nfactor = flag.Int("backoff-factor", 2, "factor for exponential backoff") bBackoff = flag.Bool("disable-backoff", false, "disable exponential backoff on errors") - ) // Track which flags were explicitly set @@ -229,7 +228,7 @@ func main() { if errorOccurred && !backOff{ currentInterval *= factor - if currentInterval > maxInterval { + if currentInterval > maxInterval && maxInterval != 0{ log.Println("Unable to start workflow after retries", err) os.Exit(1) } From c57adc1d030dde21fb40e1afd6cff1fc031f2b29 Mon Sep 17 00:00:00 2001 From: PhilHenning Date: Tue, 4 Nov 2025 09:51:49 -0700 Subject: [PATCH 6/9] Addressing PR comments --- cmd/runner/main.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/cmd/runner/main.go b/cmd/runner/main.go index a35f254..39fcb63 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -30,7 +30,7 @@ var ( sTaskQueue = flag.String("tq", "benchmark", "task queue") nmaxInterval = flag.Int("max-interval", 60, "maximum interval (in seconds) for exponential backoff") nfactor = flag.Int("backoff-factor", 2, "factor for exponential backoff") - bBackoff = flag.Bool("disable-backoff", false, "disable exponential backoff on errors") + bDisableBackoff = flag.Bool("disable-backoff", false, "disable exponential backoff on errors") ) // Track which flags were explicitly set @@ -102,7 +102,7 @@ func main() { waitForCompletion := getBoolValue("w", "TEMPORAL_WAIT", *bWait, true) namespace := getStringValue("n", "TEMPORAL_NAMESPACE", *sNamespace, "default") taskQueue := getStringValue("tq", "TEMPORAL_TASK_QUEUE", *sTaskQueue, "benchmark") - backOff := getBoolValue("disable-backoff", "TEMPORAL_DISABLE_ERROR_BACKOFF", *bBackoff, false) + backOff := getBoolValue("disable-backoff", "TEMPORAL_DISABLE_ERROR_BACKOFF", *bDisableBackoff, false) maxInterval := getIntValue("max-interval", "TEMPORAL_BACKOFF_MAX_INTERVAL", *nmaxInterval, 60) factor := getIntValue("backoff-factor", "TEMPORAL_BACKOFF_FACTOR", *nfactor, 2) @@ -212,7 +212,8 @@ func main() { pool.Submit(func() { wf, err := starter() if err != nil { - log.Println("Unable to start workflow", err) + // log.Println("Unable to start workflow", err) + fmt.Fprintf(os.Stderr, "Unable to start workflow: %v\n", err) errorOccurred = true return } @@ -221,19 +222,20 @@ func main() { err = wf.Get(context.Background(), nil) if err != nil { log.Println("Workflow failed", err) + errorOccurred = true return } } + if !errorOccurred { + currentInterval = 1 + } }) if errorOccurred && !backOff{ - currentInterval *= factor - - if currentInterval > maxInterval && maxInterval != 0{ - log.Println("Unable to start workflow after retries", err) - os.Exit(1) + if currentInterval < maxInterval && maxInterval != 0 { + currentInterval *= factor } - log.Printf("Waiting for %d seconds before retrying to start workflow...", currentInterval) + fmt.Fprintln(os.Stderr, fmt.Sprintf("Waiting for %d seconds before retrying to start workflow...", currentInterval)) time.Sleep(time.Duration(currentInterval) * time.Second) errorOccurred = false } From 53f0c4a6994c543bfd95a22b1d74f8b5e1b20d5f Mon Sep 17 00:00:00 2001 From: PhilHenning Date: Thu, 6 Nov 2025 10:42:29 -0700 Subject: [PATCH 7/9] Using err channel + last error instead of any error for err detection + backoff --- cmd/runner/main.go | 47 ++++++++++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/cmd/runner/main.go b/cmd/runner/main.go index 39fcb63..02e3fe4 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -102,7 +102,7 @@ func main() { waitForCompletion := getBoolValue("w", "TEMPORAL_WAIT", *bWait, true) namespace := getStringValue("n", "TEMPORAL_NAMESPACE", *sNamespace, "default") taskQueue := getStringValue("tq", "TEMPORAL_TASK_QUEUE", *sTaskQueue, "benchmark") - backOff := getBoolValue("disable-backoff", "TEMPORAL_DISABLE_ERROR_BACKOFF", *bDisableBackoff, false) + disableBackOff := getBoolValue("disable-backoff", "TEMPORAL_DISABLE_ERROR_BACKOFF", *bDisableBackoff, false) maxInterval := getIntValue("max-interval", "TEMPORAL_BACKOFF_MAX_INTERVAL", *nmaxInterval, 60) factor := getIntValue("backoff-factor", "TEMPORAL_BACKOFF_FACTOR", *nfactor, 2) @@ -206,38 +206,53 @@ func main() { } go (func() { - var errorOccurred bool=false currentInterval := 1 + errChan := make(chan error, concurrentWorkflows) + for { pool.Submit(func() { wf, err := starter() if err != nil { - // log.Println("Unable to start workflow", err) fmt.Fprintf(os.Stderr, "Unable to start workflow: %v\n", err) - errorOccurred = true + errChan <- err return } - + if waitForCompletion { err = wf.Get(context.Background(), nil) if err != nil { log.Println("Workflow failed", err) - errorOccurred = true + errChan <- err return } } - if !errorOccurred { - currentInterval = 1 - } + + errChan <- nil }) - if errorOccurred && !backOff{ - if currentInterval < maxInterval && maxInterval != 0 { - currentInterval *= factor + + var lastErr error + drained := false + for { + select { + case err := <-errChan: + lastErr = err + drained = true + default: + goto checkError } - - fmt.Fprintln(os.Stderr, fmt.Sprintf("Waiting for %d seconds before retrying to start workflow...", currentInterval)) - time.Sleep(time.Duration(currentInterval) * time.Second) - errorOccurred = false + } + + checkError: + if drained && lastErr != nil { + if !disableBackOff { + if currentInterval < maxInterval && maxInterval != 0 { + currentInterval *= factor + } + fmt.Fprintf(os.Stderr, "Waiting for %d seconds before retrying to start workflow...\n", currentInterval) + time.Sleep(time.Duration(currentInterval) * time.Second) + } + } else if drained && lastErr == nil { + currentInterval = 1 } } })() From 90e9a9bf45535bcd928ff7c5c75c181bd3464d05 Mon Sep 17 00:00:00 2001 From: PhilHenning Date: Fri, 7 Nov 2025 14:29:58 -0700 Subject: [PATCH 8/9] Addressing PR comments (Removing GoTo & Updating README) --- README.md | 5 ++++- cmd/runner/main.go | 38 +++++++++++++++++++++----------------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 1c8d1c3..1757fe2 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,9 @@ The table below lists the environment variables available and the relevant Tempo | TEMPORAL_TLS_CERT | [ClientOptions.ConnectionOptions.TLS.Certificates](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS Cert file | | TEMPORAL_TLS_CA | [ClientOptions.ConnectionOptions.TLS](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS CA Cert file | | PROMETHEUS_ENDPOINT | n/a | The address to serve prometheus metrics on | +| TEMPORAL_DISABLE_ERROR_BACKOFF | n/a | Disable request expotential backoff on work request failure | +| TEMPORAL_BACKOFF_MAX_INTERVAL | n/a | Sets the max interval (seconds) that can be reached by the backoff | +| TEMPORAL_BACKOFF_FACTOR | n/a | Sets the factor the interval is multiplied by | The runner is also configured via command line options: @@ -211,4 +214,4 @@ This activity sleeps for the given number of seconds. It never returns an error. `Echo({ Message: string, Padding?: []byte }) result` -This activity simply returns the message as it's result. This can be used for stress testing polling with activities that return instantly. The optional `Padding` field can be used to increase the size of the activity input in workflow history. \ No newline at end of file +This activity simply returns the message as it's result. This can be used for stress testing polling with activities that return instantly. The optional `Padding` field can be used to increase the size of the activity input in workflow history. diff --git a/cmd/runner/main.go b/cmd/runner/main.go index 02e3fe4..7758d04 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -28,8 +28,8 @@ var ( bWait = flag.Bool("w", true, "wait for workflows to complete") sNamespace = flag.String("n", "default", "namespace") sTaskQueue = flag.String("tq", "benchmark", "task queue") - nmaxInterval = flag.Int("max-interval", 60, "maximum interval (in seconds) for exponential backoff") - nfactor = flag.Int("backoff-factor", 2, "factor for exponential backoff") + nMaxInterval = flag.Int("max-interval", 60, "maximum interval (in seconds) for exponential backoff") + nFactor = flag.Int("backoff-factor", 2, "factor for exponential backoff") bDisableBackoff = flag.Bool("disable-backoff", false, "disable exponential backoff on errors") ) @@ -103,8 +103,8 @@ func main() { namespace := getStringValue("n", "TEMPORAL_NAMESPACE", *sNamespace, "default") taskQueue := getStringValue("tq", "TEMPORAL_TASK_QUEUE", *sTaskQueue, "benchmark") disableBackOff := getBoolValue("disable-backoff", "TEMPORAL_DISABLE_ERROR_BACKOFF", *bDisableBackoff, false) - maxInterval := getIntValue("max-interval", "TEMPORAL_BACKOFF_MAX_INTERVAL", *nmaxInterval, 60) - factor := getIntValue("backoff-factor", "TEMPORAL_BACKOFF_FACTOR", *nfactor, 2) + maxInterval := getIntValue("max-interval", "TEMPORAL_BACKOFF_MAX_INTERVAL", *nMaxInterval, 60) + factor := getIntValue("backoff-factor", "TEMPORAL_BACKOFF_FACTOR", *nFactor, 2) log.Printf("Using namespace: %s", namespace) @@ -221,7 +221,7 @@ func main() { if waitForCompletion { err = wf.Get(context.Background(), nil) if err != nil { - log.Println("Workflow failed", err) + fmt.Fprintf(os.Stderr, "Workflow failed: %v\n", err) errChan <- err return } @@ -231,27 +231,31 @@ func main() { }) var lastErr error - drained := false + updated := false + + drainLoop: for { select { case err := <-errChan: lastErr = err - drained = true + updated = true default: - goto checkError + break drainLoop } } - checkError: - if drained && lastErr != nil { - if !disableBackOff { - if currentInterval < maxInterval && maxInterval != 0 { - currentInterval *= factor - } - fmt.Fprintf(os.Stderr, "Waiting for %d seconds before retrying to start workflow...\n", currentInterval) - time.Sleep(time.Duration(currentInterval) * time.Second) + if disableBackOff || !updated { + continue + } + + if updated && lastErr != nil { + fmt.Fprintf(os.Stderr, "Waiting for %d seconds before retrying to start workflow...\n", currentInterval) + time.Sleep(time.Duration(currentInterval) * time.Second) + nInterval := currentInterval * factor + if nInterval < maxInterval && maxInterval != 0 { + currentInterval *= factor } - } else if drained && lastErr == nil { + } else if updated && lastErr == nil { currentInterval = 1 } } From 96616e81db4bb7d79ef9e8656a4f56b721e07238 Mon Sep 17 00:00:00 2001 From: PhilHenning Date: Fri, 7 Nov 2025 14:50:19 -0700 Subject: [PATCH 9/9] Updating if/else to lastErr --- cmd/runner/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/runner/main.go b/cmd/runner/main.go index 7758d04..d803fbc 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -248,14 +248,14 @@ func main() { continue } - if updated && lastErr != nil { + if lastErr != nil { fmt.Fprintf(os.Stderr, "Waiting for %d seconds before retrying to start workflow...\n", currentInterval) time.Sleep(time.Duration(currentInterval) * time.Second) nInterval := currentInterval * factor if nInterval < maxInterval && maxInterval != 0 { currentInterval *= factor } - } else if updated && lastErr == nil { + } else if lastErr == nil { currentInterval = 1 } }