diff --git a/README.md b/README.md index 1c8d1c3..1757fe2 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,9 @@ The table below lists the environment variables available and the relevant Tempo | TEMPORAL_TLS_CERT | [ClientOptions.ConnectionOptions.TLS.Certificates](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS Cert file | | TEMPORAL_TLS_CA | [ClientOptions.ConnectionOptions.TLS](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS CA Cert file | | PROMETHEUS_ENDPOINT | n/a | The address to serve prometheus metrics on | +| TEMPORAL_DISABLE_ERROR_BACKOFF | n/a | Disable request expotential backoff on work request failure | +| TEMPORAL_BACKOFF_MAX_INTERVAL | n/a | Sets the max interval (seconds) that can be reached by the backoff | +| TEMPORAL_BACKOFF_FACTOR | n/a | Sets the factor the interval is multiplied by | The runner is also configured via command line options: @@ -211,4 +214,4 @@ This activity sleeps for the given number of seconds. It never returns an error. `Echo({ Message: string, Padding?: []byte }) result` -This activity simply returns the message as it's result. This can be used for stress testing polling with activities that return instantly. The optional `Padding` field can be used to increase the size of the activity input in workflow history. \ No newline at end of file +This activity simply returns the message as it's result. This can be used for stress testing polling with activities that return instantly. The optional `Padding` field can be used to increase the size of the activity input in workflow history. diff --git a/cmd/runner/main.go b/cmd/runner/main.go index 4dfe72e..d803fbc 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -21,12 +21,17 @@ import ( "go.temporal.io/sdk/client" ) -var nWorkflows = flag.Int("c", 10, "concurrent workflows") -var sWorkflow = flag.String("t", "", "workflow type") -var sSignalType = flag.String("s", "", "signal type") -var bWait = flag.Bool("w", true, "wait for workflows to complete") -var sNamespace = flag.String("n", "default", "namespace") -var sTaskQueue = flag.String("tq", "benchmark", "task queue") +var ( + nWorkflows = flag.Int("c", 10, "concurrent workflows") + sWorkflow = flag.String("t", "", "workflow type") + sSignalType = flag.String("s", "", "signal type") + bWait = flag.Bool("w", true, "wait for workflows to complete") + sNamespace = flag.String("n", "default", "namespace") + sTaskQueue = flag.String("tq", "benchmark", "task queue") + nMaxInterval = flag.Int("max-interval", 60, "maximum interval (in seconds) for exponential backoff") + nFactor = flag.Int("backoff-factor", 2, "factor for exponential backoff") + bDisableBackoff = flag.Bool("disable-backoff", false, "disable exponential backoff on errors") +) // Track which flags were explicitly set var flagsSet = make(map[string]bool) @@ -97,6 +102,9 @@ func main() { waitForCompletion := getBoolValue("w", "TEMPORAL_WAIT", *bWait, true) namespace := getStringValue("n", "TEMPORAL_NAMESPACE", *sNamespace, "default") taskQueue := getStringValue("tq", "TEMPORAL_TASK_QUEUE", *sTaskQueue, "benchmark") + disableBackOff := getBoolValue("disable-backoff", "TEMPORAL_DISABLE_ERROR_BACKOFF", *bDisableBackoff, false) + maxInterval := getIntValue("max-interval", "TEMPORAL_BACKOFF_MAX_INTERVAL", *nMaxInterval, 60) + factor := getIntValue("backoff-factor", "TEMPORAL_BACKOFF_FACTOR", *nFactor, 2) log.Printf("Using namespace: %s", namespace) @@ -198,22 +206,58 @@ func main() { } go (func() { + currentInterval := 1 + errChan := make(chan error, concurrentWorkflows) + for { pool.Submit(func() { wf, err := starter() if err != nil { - log.Println("Unable to start workflow", err) + fmt.Fprintf(os.Stderr, "Unable to start workflow: %v\n", err) + errChan <- err return } - + if waitForCompletion { err = wf.Get(context.Background(), nil) if err != nil { - log.Println("Workflow failed", err) + fmt.Fprintf(os.Stderr, "Workflow failed: %v\n", err) + errChan <- err return } } + + errChan <- nil }) + + var lastErr error + updated := false + + drainLoop: + for { + select { + case err := <-errChan: + lastErr = err + updated = true + default: + break drainLoop + } + } + + if disableBackOff || !updated { + continue + } + + if lastErr != nil { + fmt.Fprintf(os.Stderr, "Waiting for %d seconds before retrying to start workflow...\n", currentInterval) + time.Sleep(time.Duration(currentInterval) * time.Second) + nInterval := currentInterval * factor + if nInterval < maxInterval && maxInterval != 0 { + currentInterval *= factor + } + } else if lastErr == nil { + currentInterval = 1 + } } })()