Skip to content

Integration: Add Kueue integration for queue-based admission control and namespace-level resource quotas across TaskSpawners #930

@kelos-bot

Description

@kelos-bot

🤖 Kelos Strategist Agent @gjkim42

Area: Integration Opportunities

Summary

Kelos has no mechanism for coordinating resource consumption across TaskSpawners. Each spawner independently enforces its own maxConcurrency, but there is no cluster-level or namespace-level admission control, fair scheduling, or resource quota system. Kueue — the Kubernetes-native job queueing system (a SIG-Scheduling sub-project, now GA) — solves exactly this problem for batch workloads. This proposal adds a queueName field to TaskSpec and TaskTemplate so that Kelos Jobs are admitted through Kueue's quota and scheduling system, giving operators cross-spawner resource governance with zero custom CRD implementation.

Problem

1. Per-spawner maxConcurrency doesn't protect the cluster

Each TaskSpawner enforces its own maxConcurrency independently (cmd/kelos-spawner/main.go:312-328):

if maxConcurrency > 0 && int32(activeTasks) >= maxConcurrency {
    log.Info("Max concurrency reached, skipping remaining items",
             "activeTasks", activeTasks, "maxConcurrency", maxConcurrency)
    break
}

Spawners don't coordinate. A deployment with 9 TaskSpawners (like the self-development setup) could have individual maxConcurrency values summing to 19 concurrent tasks. In a burst scenario (issues filed + cron triggers + PR reviews arriving simultaneously), the cluster would need to absorb all 19 agent pods — each requesting 512Mi–2Gi memory — with no backpressure.

Issue #675 (ConcurrencyPolicy CRD) proposes a Kelos-native solution to this. However, building custom cross-spawner coordination is non-trivial: it requires a centralized admission controller, distributed locking, and careful handling of race conditions between multiple spawner pods. Kueue has already solved all of this.

2. No model-aware or cost-aware scheduling

All agent tasks are scheduled identically regardless of model cost. An Opus task ($15–75/hour of compute at current token prices) and a Haiku task ($0.25–1/hour) compete equally for execution slots. Operators cannot express "allow up to 10 Haiku tasks but max 2 Opus tasks" without creating separate namespaces — which breaks branch locking (scoped to namespace in task_controller.go:785-791).

3. No fair sharing between spawners

When resources are constrained, the spawner that polls first wins. There's no weighted fair sharing — a high-volume kelos-triage spawner (maxConcurrency=8) can starve a critical kelos-workers spawner (maxConcurrency=3) simply by discovering work items first.

4. No preemption for priority work

All tasks are equal. A critical production bug fix competes with routine code cleanup for execution slots. PriorityLabels sorts items within a single spawner's discovery cycle, but provides no cross-spawner priority.

Why Kueue

Kueue is the SIG-Scheduling standard for batch job admission in Kubernetes. It provides:

  • ClusterQueues: Cluster-level resource pools with quotas (CPU, memory, custom resources)
  • LocalQueues: Namespace-scoped entry points that map to ClusterQueues
  • Fair sharing: Weighted fair scheduling across queues with borrowing and lending
  • Priority & preemption: WorkloadPriority classes with configurable preemption strategies
  • Flavor-based quotas: Different resource quotas per hardware tier (e.g., GPU vs CPU nodes)
  • Multi-tenant isolation: Cohort-based resource partitioning

Kueue's integration model is minimal: Jobs are created with suspend: true and a queue label. Kueue unsuspends them when quota is available. This is a perfect fit for Kelos, which already creates standard batch/v1 Jobs.

Kueue vs custom ConcurrencyPolicy (#675)

Capability ConcurrencyPolicy (#675) Kueue
Cross-spawner limits Proposed (needs implementation) Built-in
Model-based quotas Proposed (needs implementation) Via ResourceFlavors
Fair sharing Not proposed Built-in with weights
Priority preemption Not proposed Built-in
Scheduling windows Proposed Via admission checks
Maintenance burden New CRD + controller Maintained by SIG-Scheduling
Maturity Proposed GA (v1.0+)

Kueue doesn't eliminate all need for #675 (e.g., scheduling windows are Kelos-specific), but it covers the resource management and fairness aspects with battle-tested code.

Proposed Solution

1. Add queueName field to TaskSpec and TaskTemplate

// TaskSpec (task_types.go)
type TaskSpec struct {
    // ... existing fields ...

    // QueueName assigns this Task to a Kueue LocalQueue for admission control.
    // When set, the Job is created with suspend=true and the
    // kueue.x-k8s.io/queue-name label. Kueue unsuspends the Job when
    // quota is available. When empty, Jobs are created without Kueue
    // integration and start immediately (current behavior).
    // +optional
    QueueName string `json:"queueName,omitempty"`
}

// TaskTemplate (taskspawner_types.go)
type TaskTemplate struct {
    // ... existing fields ...

    // QueueName assigns spawned Tasks to a Kueue LocalQueue.
    // +optional
    QueueName string `json:"queueName,omitempty"`
}

2. Job creation changes in JobBuilder

When queueName is set, two changes to Job creation in job_builder.go:

// In buildAgentJob():
job := &batchv1.Job{
    ObjectMeta: metav1.ObjectMeta{
        Name:      task.Name,
        Namespace: task.Namespace,
        Labels: map[string]string{
            "kelos.dev/name":       "kelos",
            "kelos.dev/component":  "task",
            "kelos.dev/managed-by": "kelos-controller",
            "kelos.dev/task":       task.Name,
        },
    },
    Spec: batchv1.JobSpec{
        BackoffLimit:     &backoffLimit,
        // ...
    },
}

// Add Kueue integration when queueName is set
if task.Spec.QueueName != "" {
    job.Labels["kueue.x-k8s.io/queue-name"] = task.Spec.QueueName
    suspend := true
    job.Spec.Suspend = &suspend
}

This is the entire JobBuilder change — two fields when queueName is non-empty. The existing code path (no queueName) is completely unchanged.

3. Status handling — already works

The current updateStatus() logic in task_controller.go:473-514 handles Kueue-suspended Jobs correctly without modification:

if job.Status.Active > 0 {
    newPhase = TaskPhaseRunning        // Pod started → Kueue admitted
} else if job.Status.Succeeded > 0 {
    newPhase = TaskPhaseSucceeded      // Completed
} else if isJobFailed(job) {
    newPhase = TaskPhaseFailed         // Failed
}
// else: no phase change → Task stays Pending (Kueue hasn't admitted yet)

When a Job is suspended by Kueue: Active=0, Succeeded=0, no Failed condition → newPhase is empty → Task stays in Pending. When Kueue unsuspends: pod starts → Active>0 → Task transitions to Running. No changes needed to updateStatus.

Optional enhancement: Add a Queued condition to the Task when the Job exists but is suspended, so users can distinguish "Task is waiting for Kueue admission" from "Task hasn't had its Job created yet":

// In updateStatus(), before the phase switch:
if job.Spec.Suspend != nil && *job.Spec.Suspend {
    meta.SetStatusCondition(&task.Status.Conditions, metav1.Condition{
        Type:    "Queued",
        Status:  metav1.ConditionTrue,
        Reason:  "WaitingForAdmission",
        Message: fmt.Sprintf("Job is queued in Kueue LocalQueue %q",
                             task.Spec.QueueName),
    })
}

4. Spawner interaction

The spawner's maxConcurrency check counts active (non-terminal) tasks. Kueue-queued tasks are in Pending phase, which is non-terminal. This means:

  • maxConcurrency still gates how many tasks the spawner creates
  • Kueue gates how many of those tasks actually run
  • Both limits compose naturally: spawner creates up to maxConcurrency tasks, Kueue admits a subset based on quota

No spawner changes are needed.

Example Configuration

Kueue resources (cluster-level, set up once by platform team)

apiVersion: kueue.x-k8s.io/v1beta1
kind: ResourceFlavor
metadata:
  name: agent-nodes
spec:
  nodeLabels:
    kelos.dev/agent-pool: "true"
---
apiVersion: kueue.x-k8s.io/v1beta1
kind: ClusterQueue
metadata:
  name: kelos-agents
spec:
  namespaceSelector: {}
  resourceGroups:
    - coveredResources: ["cpu", "memory"]
      flavors:
        - name: agent-nodes
          resources:
            - name: cpu
              nominalQuota: "16"     # Max 16 CPU across all agent tasks
            - name: memory
              nominalQuota: "32Gi"   # Max 32Gi across all agent tasks
  preemption:
    withinClusterQueue: LowerPriority
---
apiVersion: kueue.x-k8s.io/v1beta1
kind: LocalQueue
metadata:
  name: kelos-default
  namespace: kelos-system
spec:
  clusterQueue: kelos-agents

TaskSpawner using Kueue

apiVersion: kelos.dev/v1alpha1
kind: TaskSpawner
metadata:
  name: kelos-workers
  namespace: kelos-system
spec:
  when:
    githubIssues:
      labels: ["agent-ready"]
  maxConcurrency: 5          # Spawner creates up to 5 tasks
  taskTemplate:
    type: claude-code
    queueName: kelos-default  # Kueue admits based on cluster quota
    credentials:
      type: api-key
      secretRef:
        name: anthropic-key
    workspaceRef:
      name: my-workspace
    promptTemplate: "Fix issue #{{.Number}}: {{.Title}}"

Multi-queue setup for priority tiers

# High-priority queue for production bug fixes
apiVersion: kueue.x-k8s.io/v1beta1
kind: LocalQueue
metadata:
  name: kelos-critical
  namespace: kelos-system
  annotations:
    kueue.x-k8s.io/default-priority: "1000"
spec:
  clusterQueue: kelos-agents
---
# Standard queue for routine work
apiVersion: kueue.x-k8s.io/v1beta1
kind: LocalQueue
metadata:
  name: kelos-standard
  namespace: kelos-system
  annotations:
    kueue.x-k8s.io/default-priority: "100"
spec:
  clusterQueue: kelos-agents
---
# Critical: production bugs → high-priority queue
apiVersion: kelos.dev/v1alpha1
kind: TaskSpawner
metadata:
  name: kelos-hotfix
spec:
  when:
    githubIssues:
      labels: ["priority/critical", "agent-ready"]
  taskTemplate:
    type: claude-code
    queueName: kelos-critical       # Preempts standard tasks
    model: claude-opus-4-6
    # ...
---
# Standard: routine issues → standard queue
apiVersion: kelos.dev/v1alpha1
kind: TaskSpawner
metadata:
  name: kelos-triage
spec:
  when:
    githubIssues:
      labels: ["triage-accepted", "agent-ready"]
  taskTemplate:
    type: claude-code
    queueName: kelos-standard       # Yields to critical tasks
    model: claude-sonnet-4-6
    # ...

Implementation Scope

Minimal (one PR)

  1. Add QueueName field to TaskSpec and TaskTemplate (~4 lines of type changes)
  2. In JobBuilder.buildAgentJob(), set suspend: true and queue label when QueueName is non-empty (~6 lines)
  3. In TaskReconciler.updateStatus(), optionally add Queued condition for suspended Jobs (~8 lines)
  4. Update spawner task builder to propagate QueueName from template (~2 lines)
  5. Unit tests for JobBuilder with and without queueName
  6. Integration test verifying Job is created with correct labels and suspend flag

Total implementation: ~20 lines of production code. Backward-compatible — omitting queueName preserves existing behavior exactly.

Not in scope

  • Installing or configuring Kueue itself (operator responsibility)
  • Custom Kueue ResourceFlavors for model-based quotas (documented as a pattern, not enforced)
  • Kueue-aware CLI commands (future enhancement)

Backward Compatibility

Fully backward-compatible. The queueName field is optional with empty default. When empty:

  • Jobs are created without suspend or Kueue labels (identical to current behavior)
  • No Kueue installation required
  • No behavioral change for existing TaskSpawners

When Kueue is not installed but queueName is set, Jobs will remain suspended indefinitely. This is a safe failure mode — Tasks stay Pending and don't consume resources. Documentation should warn that Kueue must be installed when queueName is used.

Relationship to Other Proposals

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions