Skip to content
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ The table below lists the environment variables available and the relevant Tempo
| TEMPORAL_TLS_CERT | [ClientOptions.ConnectionOptions.TLS.Certificates](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS Cert file |
| TEMPORAL_TLS_CA | [ClientOptions.ConnectionOptions.TLS](https://pkg.go.dev/go.temporal.io/sdk@v1.15.0/internal#ConnectionOptions) | Path to TLS CA Cert file |
| PROMETHEUS_ENDPOINT | n/a | The address to serve prometheus metrics on |
| TEMPORAL_DISABLE_ERROR_BACKOFF | n/a | Disable request expotential backoff on work request failure |
| TEMPORAL_BACKOFF_MAX_INTERVAL | n/a | Sets the max interval (seconds) that can be reached by the backoff |
| TEMPORAL_BACKOFF_FACTOR | n/a | Sets the factor the interval is multiplied by |

The runner is also configured via command line options:

Expand Down Expand Up @@ -211,4 +214,4 @@ This activity sleeps for the given number of seconds. It never returns an error.

`Echo({ Message: string, Padding?: []byte }) result`

This activity simply returns the message as it's result. This can be used for stress testing polling with activities that return instantly. The optional `Padding` field can be used to increase the size of the activity input in workflow history.
This activity simply returns the message as it's result. This can be used for stress testing polling with activities that return instantly. The optional `Padding` field can be used to increase the size of the activity input in workflow history.
62 changes: 53 additions & 9 deletions cmd/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ import (
"go.temporal.io/sdk/client"
)

var nWorkflows = flag.Int("c", 10, "concurrent workflows")
var sWorkflow = flag.String("t", "", "workflow type")
var sSignalType = flag.String("s", "", "signal type")
var bWait = flag.Bool("w", true, "wait for workflows to complete")
var sNamespace = flag.String("n", "default", "namespace")
var sTaskQueue = flag.String("tq", "benchmark", "task queue")
var (
nWorkflows = flag.Int("c", 10, "concurrent workflows")
sWorkflow = flag.String("t", "", "workflow type")
sSignalType = flag.String("s", "", "signal type")
bWait = flag.Bool("w", true, "wait for workflows to complete")
sNamespace = flag.String("n", "default", "namespace")
sTaskQueue = flag.String("tq", "benchmark", "task queue")
nMaxInterval = flag.Int("max-interval", 60, "maximum interval (in seconds) for exponential backoff")
nFactor = flag.Int("backoff-factor", 2, "factor for exponential backoff")
bDisableBackoff = flag.Bool("disable-backoff", false, "disable exponential backoff on errors")
)

// Track which flags were explicitly set
var flagsSet = make(map[string]bool)
Expand Down Expand Up @@ -97,6 +102,9 @@ func main() {
waitForCompletion := getBoolValue("w", "TEMPORAL_WAIT", *bWait, true)
namespace := getStringValue("n", "TEMPORAL_NAMESPACE", *sNamespace, "default")
taskQueue := getStringValue("tq", "TEMPORAL_TASK_QUEUE", *sTaskQueue, "benchmark")
disableBackOff := getBoolValue("disable-backoff", "TEMPORAL_DISABLE_ERROR_BACKOFF", *bDisableBackoff, false)
Comment thread
robholland marked this conversation as resolved.
maxInterval := getIntValue("max-interval", "TEMPORAL_BACKOFF_MAX_INTERVAL", *nMaxInterval, 60)
factor := getIntValue("backoff-factor", "TEMPORAL_BACKOFF_FACTOR", *nFactor, 2)

log.Printf("Using namespace: %s", namespace)

Expand Down Expand Up @@ -198,22 +206,58 @@ func main() {
}

go (func() {
currentInterval := 1
errChan := make(chan error, concurrentWorkflows)

for {
pool.Submit(func() {
wf, err := starter()
if err != nil {
log.Println("Unable to start workflow", err)
fmt.Fprintf(os.Stderr, "Unable to start workflow: %v\n", err)
errChan <- err
return
}

if waitForCompletion {
err = wf.Get(context.Background(), nil)
if err != nil {
log.Println("Workflow failed", err)
fmt.Fprintf(os.Stderr, "Workflow failed: %v\n", err)
errChan <- err
Comment thread
robholland marked this conversation as resolved.
return
}
}

errChan <- nil
})

var lastErr error
updated := false

drainLoop:
for {
select {
case err := <-errChan:
lastErr = err
updated = true
default:
break drainLoop
}
}

if disableBackOff || !updated {
continue
}

if lastErr != nil {
fmt.Fprintf(os.Stderr, "Waiting for %d seconds before retrying to start workflow...\n", currentInterval)
time.Sleep(time.Duration(currentInterval) * time.Second)
nInterval := currentInterval * factor
if nInterval < maxInterval && maxInterval != 0 {
currentInterval *= factor
}
} else if lastErr == nil {
currentInterval = 1
}
}
})()

Expand Down
Loading