diff --git a/README.md b/README.md index 13569a82..e3037a20 100644 --- a/README.md +++ b/README.md @@ -154,6 +154,8 @@ with an external configuration file, like TOML, decoupling connection settings f - [**Eager Workflow Start**](./eager-workflow-start): Demonstrates how to start a workflow in eager mode, an experimental latency optimization. +- [**Task Queue Priority and Fairness**](./task-queue-priority-fairness): Demonstrates Activity Task dispatch using `PriorityKey`, `FairnessKey`, and `FairnessWeight` with a multi-tenant backlog. + ### Dynamic Workflow logic examples These samples demonstrate some common control flow patterns using Temporal's Go SDK API. diff --git a/task-queue-priority-fairness/README.md b/task-queue-priority-fairness/README.md new file mode 100644 index 00000000..862a3575 --- /dev/null +++ b/task-queue-priority-fairness/README.md @@ -0,0 +1,172 @@ +# Task Queue Priority and Fairness + +This sample demonstrates Temporal Task Queue Priority and Fairness for Activity Tasks using a multi-tenant media rendering workload. + +A SaaS rendering platform receives urgent previews, normal renders, and background archive jobs from multiple tenants into one shared Activity Task Queue. The sample intentionally creates backlog so dispatch order is visible: + +- `PriorityKey` chooses which priority level is dispatched first. Lower values run first, so urgent preview jobs use `PriorityKey=1`, normal render jobs use `PriorityKey=3`, and background archive jobs use `PriorityKey=5`. +- `FairnessKey` groups work by tenant within a priority level. Every Activity Task in this sample uses its tenant ID as a non-empty fairness key. +- `FairnessWeight` gives `premium-media` a larger proportional share while it has backlog. The premium tenant uses `FairnessWeight=3.0`; regular tenants use `FairnessWeight=1.0`. + +Priority determines which priority sub-queue Tasks go into. Fairness determines ordering within a given priority level. Fairness ordering is probabilistic and observational, so the exact order can vary between runs. + +## Workload definition + +This sample uses a fixed workload on one shared Activity Task Queue so readers +can predict expected behavior before running it. + +Tenants and fairness weights: + +- `premium-media`: `FairnessWeight=3.0` +- `large-studio`: `FairnessWeight=1.0` +- `small-studio-a`: `FairnessWeight=1.0` +- `small-studio-b`: `FairnessWeight=1.0` + +Job mix: + +- Urgent preview (`PriorityKey=1`) + - `premium-media`: 2 jobs + - `small-studio-a`: 2 jobs +- Normal render (`PriorityKey=3`) + - `large-studio`: 18 jobs + - `small-studio-a`: 3 jobs + - `small-studio-b`: 3 jobs + - `premium-media`: 9 jobs +- Background archive (`PriorityKey=5`) + - `large-studio`: 4 jobs + +Total jobs: `41` (`4 urgent + 33 normal + 4 background`) + +`BuildJobs()` scheduling order: + +1. `large-studio` normal jobs +2. `small-studio-a` normal jobs +3. `small-studio-b` normal jobs +4. `premium-media` normal jobs +5. `large-studio` background jobs +6. Urgent jobs last + +Urgent jobs are intentionally enqueued last so the sample can show that +dispatching follows `PriorityKey` (with backlog), not submission order. + +## Expected ordering at runtime + +In the backlog sample flow (workflow worker first, then starter, then activity +worker), expected behavior is: + +1. Priority across levels: +all urgent (`priority=1`) jobs should appear before queued normal +(`priority=3`) and background (`priority=5`) jobs. +2. Fairness within normal priority: +small-tenant fairness keys should appear before `large-studio` drains all of +its normal backlog. +3. Weighted fairness within normal priority: +`premium-media` should appear repeatedly near the front of normal-priority +dispatch while it still has backlog. + +Fairness is probabilistic, so exact row-by-row ordering can vary between runs. +The sample is designed to validate these patterns, not a deterministic sequence. + +## Run the backlog sample + +### 1. Start Temporal Server with Fairness enabled + +The following dev server config is known to work well for this sample: + +```bash +temporal server start-dev \ + --dynamic-config-value matching.useNewMatcher=true \ + --dynamic-config-value matching.enableFairness=true \ + --dynamic-config-value matching.numTaskqueueReadPartitions=1 \ + --dynamic-config-value matching.numTaskqueueWritePartitions=1 +``` + +Why these options: + +- `matching.useNewMatcher=true`: enables the matcher implementation that supports task queue priority/fairness behavior used by this sample. +- `matching.enableFairness=true`: turns on fairness-aware dispatch so `FairnessKey` and `FairnessWeight` affect ordering within a priority level. +- `matching.numTaskqueueReadPartitions=1` and `matching.numTaskqueueWritePartitions=1`: forces a single partition for this sample, which makes observed ordering easier to interpret and less noisy than multi-partition dispatch. + +### 2. Start only the Workflow Worker + +```bash +go run task-queue-priority-fairness/worker/main.go -mode workflow +``` + +### 3. Start the Workflow + +In another terminal: + +```bash +go run task-queue-priority-fairness/starter/main.go +``` + +The starter prints a reminder to start the Activity Worker. At this point, the Workflow has scheduled many Activity Tasks, but no Activity Worker is polling yet, so the Activity Task Queue has backlog. + +### 4. Start the constrained Activity Worker + +In another terminal: + +```bash +go run task-queue-priority-fairness/worker/main.go -mode activity +``` + +The Activity Worker uses `MaxConcurrentActivityExecutionSize: 1`, making the dispatch/start order easy to observe in logs and in the starter output. + +## What to look for + +The starter prints a table of observed Activity start order and a summary. In the backlog-focused flow, urgent preview jobs submitted last should be dispatched before lower-priority queued work. Within normal render jobs, small tenants should appear before the large tenant drains its backlog, and `premium-media` should receive repeated dispatches while it remains backlogged. + +## Example output + +A successful run should look similar to this. +> [!NOTE] +> Timestamps, Run IDs, and Worker IDs will differ on your machine. + +```text +2026/05/05 00:45:09 Started workflow. WorkflowID task-queue-priority-fairness- RunID +If you are running the full backlog sample, start the Activity Worker now: +go run task-queue-priority-fairness/worker/main.go -mode activity +Activity start order: + +01 started_at= priority=1 fairness_key=premium-media weight=3.0 kind=urgent-preview job=premium-media-urgent-preview-00 +02 started_at= priority=1 fairness_key=premium-media weight=3.0 kind=urgent-preview job=premium-media-urgent-preview-01 +03 started_at= priority=1 fairness_key=small-studio-a weight=1.0 kind=urgent-preview job=small-studio-a-urgent-preview-01 +04 started_at= priority=1 fairness_key=small-studio-a weight=1.0 kind=urgent-preview job=small-studio-a-urgent-preview-00 +05 started_at= priority=3 fairness_key=premium-media weight=3.0 kind=normal-render job=premium-media-normal-render-06 +06 started_at= priority=3 fairness_key=small-studio-b weight=1.0 kind=normal-render job=small-studio-b-normal-render-00 +07 started_at= priority=3 fairness_key=small-studio-a weight=1.0 kind=normal-render job=small-studio-a-normal-render-02 +... +37 started_at= priority=3 fairness_key=large-studio weight=1.0 kind=normal-render job=large-studio-normal-render-17 +38 started_at= priority=5 fairness_key=large-studio weight=1.0 kind=background-archive job=large-studio-background-archive-03 +39 started_at= priority=5 fairness_key=large-studio weight=1.0 kind=background-archive job=large-studio-background-archive-02 +40 started_at= priority=5 fairness_key=large-studio weight=1.0 kind=background-archive job=large-studio-background-archive-01 +41 started_at= priority=5 fairness_key=large-studio weight=1.0 kind=background-archive job=large-studio-background-archive-00 + +Summary: + +Priority: + Urgent jobs use PriorityKey=1. + Normal jobs use PriorityKey=3. + Background jobs use PriorityKey=5. + Urgent work was dispatched before lower-priority queued work: OBSERVED + +Fairness: + Tenant IDs are used as FairnessKey values. + Small tenants appeared before the large tenant drained all normal jobs: OBSERVED + +Weighted fairness: + premium-media uses FairnessWeight=3.0. + Other tenants use FairnessWeight=1.0. + Premium work received repeated dispatches while backlogged: OBSERVED + +Note: + Fairness ordering is probabilistic and can vary between runs. +``` + +How to read this output: + +- Rows `01` to `04` are all urgent (`priority=1`) even though urgent jobs were scheduled last in `BuildJobs()`. This shows priority overtaking queued lower-priority work. +- Normal jobs (`priority=3`) include multiple fairness keys near the front (`premium-media`, `small-studio-a`, `small-studio-b`, `large-studio`) instead of draining one tenant first. +- `premium-media` appears more often in the early normal-priority dispatches while it has queued work, reflecting its `FairnessWeight=3.0` compared with `1.0` for the other tenants. +- Background jobs (`priority=5`) start only after normal-priority work in this run, matching the expected priority behavior. diff --git a/task-queue-priority-fairness/activity.go b/task-queue-priority-fairness/activity.go new file mode 100644 index 00000000..ff693a0f --- /dev/null +++ b/task-queue-priority-fairness/activity.go @@ -0,0 +1,35 @@ +package task_queue_priority_fairness + +import ( + "context" + "time" + + "go.temporal.io/sdk/activity" +) + +func ProcessRenderJob(ctx context.Context, job RenderJob) (RenderResult, error) { + startedAt := time.Now().UTC() + + logger := activity.GetLogger(ctx) + logger.Info( + "Started render job", + "started_at", startedAt, + "priority", job.PriorityKey, + "tenant", job.Tenant, + "weight", job.FairnessWeight, + "kind", job.Kind, + "job_id", job.JobID, + ) + + time.Sleep(150 * time.Millisecond) + + return RenderResult{ + StartedAt: startedAt, + JobID: job.JobID, + Tenant: job.Tenant, + Kind: job.Kind, + PriorityKey: job.PriorityKey, + FairnessKey: job.FairnessKey, + FairnessWeight: job.FairnessWeight, + }, nil +} diff --git a/task-queue-priority-fairness/starter/main.go b/task-queue-priority-fairness/starter/main.go new file mode 100644 index 00000000..dc10a9e7 --- /dev/null +++ b/task-queue-priority-fairness/starter/main.go @@ -0,0 +1,46 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/contrib/envconfig" + + task_queue_priority_fairness "github.com/temporalio/samples-go/task-queue-priority-fairness" +) + +func main() { + c, err := client.Dial(envconfig.MustLoadDefaultClientOptions()) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + jobs := task_queue_priority_fairness.BuildJobs() + workflowID := fmt.Sprintf("task-queue-priority-fairness-%d", time.Now().UnixNano()) + workflowOptions := client.StartWorkflowOptions{ + ID: workflowID, + TaskQueue: task_queue_priority_fairness.WorkflowTaskQueue, + } + + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, task_queue_priority_fairness.RenderWorkflow, jobs) + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } + + log.Println("Started workflow.", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + fmt.Println("If you are running the full backlog sample, start the Activity Worker now:") + fmt.Println("go run task-queue-priority-fairness/worker/main.go -mode activity") + + var results []task_queue_priority_fairness.RenderResult + if err := we.Get(context.Background(), &results); err != nil { + log.Fatalln("Unable to get workflow result", err) + } + + fmt.Println(task_queue_priority_fairness.FormatResults(results)) + summary := task_queue_priority_fairness.SummarizeResults(results) + fmt.Print(task_queue_priority_fairness.FormatSummary(summary)) +} diff --git a/task-queue-priority-fairness/summary.go b/task-queue-priority-fairness/summary.go new file mode 100644 index 00000000..3976c26c --- /dev/null +++ b/task-queue-priority-fairness/summary.go @@ -0,0 +1,160 @@ +package task_queue_priority_fairness + +import ( + "fmt" + "slices" + "sort" + "strings" + "time" +) + +func SortResultsByStartedAt(results []RenderResult) []RenderResult { + sorted := slices.Clone(results) + sort.Slice(sorted, func(i, j int) bool { + if sorted[i].StartedAt.Equal(sorted[j].StartedAt) { + return sorted[i].JobID < sorted[j].JobID + } + return sorted[i].StartedAt.Before(sorted[j].StartedAt) + }) + return sorted +} + +func FormatResults(results []RenderResult) string { + sorted := SortResultsByStartedAt(results) + var builder strings.Builder + builder.WriteString("Activity start order:\n\n") + for index, result := range sorted { + _, _ = fmt.Fprintf( + &builder, + "%02d started_at=%s priority=%d fairness_key=%-14s weight=%.1f kind=%-19s job=%s\n", + index+1, + result.StartedAt.Format(time.RFC3339), + result.PriorityKey, + result.FairnessKey, + result.FairnessWeight, + result.Kind, + result.JobID, + ) + } + return builder.String() +} + +func SummarizeResults(results []RenderResult) Summary { + sorted := SortResultsByStartedAt(results) + summary := Summary{} + + // Use the first 12 normal-priority starts as an "early window" for weighted + // fairness observation. In this sample we enqueue 33 normal jobs total (9 + // for premium-media), so 12 is large enough to observe repeated premium + // dispatches while backlog still exists, without requiring an exact ratio. + const earlyNormalWindow = 12 + + // Priority is observed by comparing the boundary timestamps between + // priority groups. If the last urgent job starts before the first normal and + // background jobs, priority overtook the lower-priority backlog in this run. + var firstNormal, firstBackground, lastUrgent time.Time + + // Fairness is observed by looking only within normal-priority work. A small + // tenant appearing before the large tenant's last normal job shows that the + // large tenant did not monopolize the normal-priority queue. + lastLargeNormalIndex := -1 + firstSmallNormalIndex := -1 + + // Weighted fairness is intentionally checked as a soft observation. The + // first 12 normal jobs are an early window while premium-media should still + // be backlogged; seeing premium more than once there suggests its larger + // FairnessWeight is giving it repeated dispatches without requiring an exact + // deterministic ratio. + premiumNormalCount := 0 + premiumInEarlyNormal := 0 + normalCount := 0 + + for index, result := range sorted { + switch result.PriorityKey { + case 1: + // Urgent work must all start before lower-priority work for the + // priority observation to pass, so keep the latest urgent start time. + if lastUrgent.IsZero() || result.StartedAt.After(lastUrgent) { + lastUrgent = result.StartedAt + } + case 3: + // Normal work is the level where we demonstrate fairness between + // tenants, so track both its first start time and tenant positions. + if firstNormal.IsZero() || result.StartedAt.Before(firstNormal) { + firstNormal = result.StartedAt + } + if result.FairnessKey == TenantLargeStudio { + lastLargeNormalIndex = index + } + if (result.FairnessKey == TenantSmallStudioA || result.FairnessKey == TenantSmallStudioB) && firstSmallNormalIndex == -1 { + firstSmallNormalIndex = index + } + normalCount++ + if result.FairnessKey == TenantPremiumMedia { + premiumNormalCount++ + // Count premium appearances only in the early normal-priority + // window. This checks that the higher weight is visible while + // premium-media still has queued work, without expecting an exact + // 3:1 ordering in a small sample. + if normalCount <= earlyNormalWindow { + premiumInEarlyNormal++ + } + } + case 5: + // Background work should wait behind urgent work in the priority + // sample, so track the first background start time. + if firstBackground.IsZero() || result.StartedAt.Before(firstBackground) { + firstBackground = result.StartedAt + } + } + } + + summary.FirstNormalStartedAt = firstNormal + summary.LastUrgentStartedAt = lastUrgent + + // Priority is observed only if every urgent job started before any normal or + // background job. Missing timestamps mean the run did not include enough + // data to make that observation. + summary.PriorityObserved = !lastUrgent.IsZero() && !firstNormal.IsZero() && !firstBackground.IsZero() && + lastUrgent.Before(firstNormal) && lastUrgent.Before(firstBackground) + + // Fairness is observed if a small tenant appears before large-studio has + // drained all of its normal-priority work. + summary.FairnessObserved = firstSmallNormalIndex != -1 && lastLargeNormalIndex != -1 && firstSmallNormalIndex < lastLargeNormalIndex + + // Weighted fairness is observed if premium-media had multiple normal jobs + // and received repeated dispatches in the early normal-priority window. + summary.WeightedFairnessObserved = premiumNormalCount > 1 && premiumInEarlyNormal > 1 + + return summary +} + +func FormatSummary(summary Summary) string { + return fmt.Sprintf(`Summary: + +Priority: + Urgent jobs use PriorityKey=1. + Normal jobs use PriorityKey=3. + Background jobs use PriorityKey=5. + Urgent work was dispatched before lower-priority queued work: %s + +Fairness: + Tenant IDs are used as FairnessKey values. + Small tenants appeared before the large tenant drained all normal jobs: %s + +Weighted fairness: + premium-media uses FairnessWeight=3.0. + Other tenants use FairnessWeight=1.0. + Premium work received repeated dispatches while backlogged: %s + +Note: + Fairness ordering is probabilistic and can vary between runs. +`, observed(summary.PriorityObserved), observed(summary.FairnessObserved), observed(summary.WeightedFairnessObserved)) +} + +func observed(value bool) string { + if value { + return "OBSERVED" + } + return "NOT OBSERVED IN THIS RUN" +} diff --git a/task-queue-priority-fairness/worker/main.go b/task-queue-priority-fairness/worker/main.go new file mode 100644 index 00000000..aa95c163 --- /dev/null +++ b/task-queue-priority-fairness/worker/main.go @@ -0,0 +1,52 @@ +package main + +import ( + "flag" + "log" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/contrib/envconfig" + "go.temporal.io/sdk/worker" + + task_queue_priority_fairness "github.com/temporalio/samples-go/task-queue-priority-fairness" +) + +func main() { + mode := flag.String("mode", "workflow", "worker mode: workflow or activity") + flag.Parse() + + c, err := client.Dial(envconfig.MustLoadDefaultClientOptions()) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + switch *mode { + case "workflow": + workflowWorker := newWorkflowWorker(c) + if err := workflowWorker.Run(worker.InterruptCh()); err != nil { + log.Fatalln("Unable to start workflow worker", err) + } + case "activity": + activityWorker := newActivityWorker(c) + if err := activityWorker.Run(worker.InterruptCh()); err != nil { + log.Fatalln("Unable to start activity worker", err) + } + default: + log.Fatalf("Unknown mode %q. Use workflow or activity.", *mode) + } +} + +func newWorkflowWorker(c client.Client) worker.Worker { + w := worker.New(c, task_queue_priority_fairness.WorkflowTaskQueue, worker.Options{}) + w.RegisterWorkflow(task_queue_priority_fairness.RenderWorkflow) + return w +} + +func newActivityWorker(c client.Client) worker.Worker { + w := worker.New(c, task_queue_priority_fairness.ActivityTaskQueue, worker.Options{ + MaxConcurrentActivityExecutionSize: 1, + }) + w.RegisterActivity(task_queue_priority_fairness.ProcessRenderJob) + return w +} diff --git a/task-queue-priority-fairness/workflow.go b/task-queue-priority-fairness/workflow.go new file mode 100644 index 00000000..d74a84c6 --- /dev/null +++ b/task-queue-priority-fairness/workflow.go @@ -0,0 +1,110 @@ +package task_queue_priority_fairness + +import ( + "fmt" + "time" + + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" +) + +const WorkflowTaskQueue = "task-queue-priority-fairness-workflow" +const ActivityTaskQueue = "task-queue-priority-fairness-activity" + +const ( + TenantLargeStudio = "large-studio" + TenantSmallStudioA = "small-studio-a" + TenantSmallStudioB = "small-studio-b" + TenantPremiumMedia = "premium-media" +) + +var TenantWeights = map[string]float64{ + TenantLargeStudio: 1.0, + TenantSmallStudioA: 1.0, + TenantSmallStudioB: 1.0, + TenantPremiumMedia: 3.0, +} + +type RenderJob struct { + JobID string + Tenant string + Kind string + PriorityKey int + FairnessKey string + FairnessWeight float64 +} + +type RenderResult struct { + StartedAt time.Time + JobID string + Tenant string + Kind string + PriorityKey int + FairnessKey string + FairnessWeight float64 +} + +type Summary struct { + PriorityObserved bool + FairnessObserved bool + WeightedFairnessObserved bool + FirstNormalStartedAt time.Time + LastUrgentStartedAt time.Time +} + +func RenderWorkflow(ctx workflow.Context, jobs []RenderJob) ([]RenderResult, error) { + futures := make([]workflow.Future, 0, len(jobs)) + + for _, job := range jobs { + ao := workflow.ActivityOptions{ + TaskQueue: ActivityTaskQueue, + StartToCloseTimeout: time.Minute, + Priority: temporal.Priority{ + PriorityKey: job.PriorityKey, + FairnessKey: job.FairnessKey, + FairnessWeight: float32(job.FairnessWeight), + }, + } + activityCtx := workflow.WithActivityOptions(ctx, ao) + future := workflow.ExecuteActivity(activityCtx, ProcessRenderJob, job) + futures = append(futures, future) + } + + results := make([]RenderResult, 0, len(futures)) + for _, future := range futures { + var result RenderResult + if err := future.Get(ctx, &result); err != nil { + return nil, err + } + results = append(results, result) + } + + return results, nil +} + +func BuildJobs() []RenderJob { + jobs := make([]RenderJob, 0, 41) + jobs = append(jobs, makeJobs(TenantLargeStudio, 18, "normal-render", 3)...) + jobs = append(jobs, makeJobs(TenantSmallStudioA, 3, "normal-render", 3)...) + jobs = append(jobs, makeJobs(TenantSmallStudioB, 3, "normal-render", 3)...) + jobs = append(jobs, makeJobs(TenantPremiumMedia, 9, "normal-render", 3)...) + jobs = append(jobs, makeJobs(TenantLargeStudio, 4, "background-archive", 5)...) + jobs = append(jobs, makeJobs(TenantPremiumMedia, 2, "urgent-preview", 1)...) + jobs = append(jobs, makeJobs(TenantSmallStudioA, 2, "urgent-preview", 1)...) + return jobs +} + +func makeJobs(tenant string, count int, kind string, priorityKey int) []RenderJob { + jobs := make([]RenderJob, 0, count) + for i := 0; i < count; i++ { + jobs = append(jobs, RenderJob{ + JobID: fmt.Sprintf("%s-%s-%02d", tenant, kind, i), + Tenant: tenant, + Kind: kind, + PriorityKey: priorityKey, + FairnessKey: tenant, + FairnessWeight: TenantWeights[tenant], + }) + } + return jobs +}