An operator turns a machine's cpu / gpu (or ingest) worker ON or OFF. The first — and so far only — policy is a hard stop: stop the in-flight work immediately and free RAM/VRAM now. The design leaves a clean seam for softer policies (e.g. drain: finish the current task, then stop) without a schema or API change.
Postgres is the only moving part. The control plane is one table; any process
that can write a row (the engine's helpers, the queue-worker-control CLI, or a
host app such as ai_leads' Rails over the shared DB) can flip a worker.
worker_controls(
host_label text,
queue text, -- 'cpu' | 'gpu' | <ingest queue>
desired_state text, -- 'on' | 'off' (CHECK)
stop_policy text, -- 'hard' (default); free-form, validated in Python
requested_by text,
updated_at timestamptz,
PRIMARY KEY (host_label, queue)
)
This is desired state, written by an operator. It is deliberately a separate
table from worker_heartbeats (which is observed, ephemeral state a live worker
upserts every ~10 s): an OFF state must persist precisely while the worker is NOT
beating — exactly when its heartbeat row is aging out — so it cannot live there.
Keyed (host_label, queue) — the same identity the heartbeat and the claim's
claimed_by/queue use. A host runs several workers under one host_label
(beelink runs a cpu and a gpu worker), so control is per-queue: turning off
"spark gpu" never touches "spark cpu".
A row trigger fires pg_notify('worker_control', '<host>:<queue>') on every
INSERT/UPDATE, so a worker wakes immediately and a plain SQL write from any
consumer (no app-side NOTIFY code) suffices — mirroring the node_job_ready /
ingest_job_ready triggers.
operator/Rails/CLI ──INSERT/UPDATE worker_controls (desired_state='off')──► trigger ──NOTIFY 'worker_control'─┐
▼
running worker: WorkerControlWatcher (daemon, LISTEN + safety poll) sees OFF ──► STOP_POLICIES['hard']
1. requeue this worker's in-flight job(s) (resume-style; node_queue.requeue_running_for_worker)
2. clear the GPU busy-ghost (node_queue.clear_worker_current_model)
3. os._exit(79) (frees RAM/VRAM — OS tears down the CUDA context)
│
supervisor (docker restart: on-failure) restarts the container ◄─────────────────────────────────────────────┘
on boot: claim_worker._park_until_enabled() reads the row → still OFF → PARK
(does NOT claim, does NOT heartbeat → ages out of the capacity gauge)
operator sets desired_state='on' (+ NOTIFY) ──► the parked loop resumes IN PLACE (no restart needed)
A claim worker runs the node body inline on its main thread — no thread or
subprocess wraps it. A watcher thread therefore cannot preempt in-flight work,
and a wedged CUDA kernel won't honour a cooperative cancel_event. Terminating
the process is the only thing that reliably stops the work and reclaims
RAM/VRAM (the OS tears down the CUDA context on exit). This is the same lever
every in-engine watchdog already uses (os._exit codes 75 budget / 76 stall /
77 reassigned / 78 gpu-health); a control hard-stop is 79.
The hard stop reuses the same auto-restart contract the watchdogs rely on
(docker restart: on-failure). On restart the worker re-reads worker_controls
and PARKS instead of claiming — so it comes back idle, not back to work.
Turning a machine off redistributes its in-flight work to a healthy peer (or
back to itself when turned ON) — it does not fail the workflow.
requeue_running_for_worker is resume-style: it flips the row running → queued,
clears the lease, and bumps priority to the front without incrementing
watchdog_retries (this is operational redistribution, not a node failure).
Clearing claimed_by also trips any surviving worker's JobStatusWatcher, so the
job is never double-run across the hand-off.
worker_control.STOP_POLICIES maps a policy name → handler(worker, *, on_exit).
Only "hard" is wired today. Future policies slot in as new handlers with no
schema or API change (stop_policy is free-form TEXT in the DB, validated in
Python against this registry):
drain— stop claiming, finish the current job, then park. Maps onto the existing cooperativeClaimWorker._stop(checked between jobs).pause— stop claiming but keep the warm model loaded for an instant resume (no RAM free).
from queue_workflows import worker_control
worker_control.disable_worker("spark", "gpu") # hard stop + stay off
worker_control.enable_worker("spark", "gpu") # resume
worker_control.set_worker_control("spark", "gpu",
desired_state="off", stop_policy="hard", requested_by="ops")
worker_control.get_worker_control("spark", "gpu") # row or None
worker_control.desired_state_for("spark", "gpu") # 'on' | 'off' (None/absent ⇒ 'on')CLI (console script): queue-worker-control --queue gpu --off [--host H] [--policy hard] / --on.
Direct SQL — any consumer sharing the DB can write the row; the trigger handles
the wake. ai_leads' Rails toggles workers this way (raw INSERT … ON CONFLICT),
no Python on the request path.
get_worker_control swallows UndefinedTable, so the engine runs unchanged on a
DB that predates migration 0012 (treated as ON, default-on).
AI_LEADS_WORKER_CONTROL_POLL_S— safety-poll cadence behind the LISTEN wake (default 5.0 s).AI_LEADS_DISABLE_WORKER_CONTROL— keep the watcher inert (tests).
A hang that holds the GIL freezes the watcher thread too, so the in-process
os._exit can't fire. The ultimate backstop is a host-local agent that
docker kills the container when it sees worker_controls.desired_state='off'
unacked — the engine cannot cross-host kill (it has no docker socket on a remote
host). That host-local kill agent is future hardening, not part of this milestone.