diff --git a/AGENTS.md b/AGENTS.md index 9ba2e34..8952b74 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## What this is -`queue_workflows` is a standalone, pip-installable **Postgres-as-queue workflow engine**: a `SELECT … FOR UPDATE SKIP LOCKED` claim loop woken by `LISTEN`, lease reclaim, a DAG dispatcher with a durable outbox, a GPU warm-model cache, periodic ingest work, per-host hw-metrics telemetry, and an operator worker ON/OFF control plane (`worker_control` — hard-stop/park a `(host, queue)` worker; see `docs/worker_control.md`). **Postgres (via `psycopg` 3) is the only hard runtime dependency.** +`queue_workflows` is a standalone, pip-installable **Postgres-as-queue workflow engine**: a `SELECT … FOR UPDATE SKIP LOCKED` claim loop woken by `LISTEN`, lease reclaim, a DAG dispatcher with a durable outbox, a GPU warm-model cache, periodic ingest work, per-host hw-metrics telemetry, a durable per-node event log, and an operator worker ON/OFF control plane (`worker_control` — hard-stop/park a `(host, queue)` worker; see `docs/worker_control.md` and `docs/watchdogs.md`). **Postgres (via `psycopg` 3) is the only hard runtime dependency.** ## Who uses it / why the `ai_leads` defaults @@ -22,6 +22,10 @@ pip install -e '.[test]' # add [metrics] for the psutil-based hw_metrics # creates it if missing (see tests/conftest.py). QUEUE_WORKFLOWS_TEST_DB_URL=postgresql://user:pw@host:port/queue_workflows_test python -m pytest # (falls back to AI_LEADS_DB_URL with its db-name suffixed _test if the above is unset) +# The multi-backend contract suite (tests/test_backend_contract.py) also reads +# QUEUE_WORKFLOWS_TEST_REDIS_URL / _MONGO_URL (mongo needs a replica set); each +# backend SKIPS if its server is unset/unreachable. `.[test]` pulls redis + +# pymongo; standalone consumers add `.[redis]` / `.[mongodb]`. See docs/storage_backends.md. python -m pytest tests/test_node_queue.py # one module python -m pytest tests/test_node_queue.py::test_name # one test @@ -31,6 +35,7 @@ python -m pytest -k lease # by keyword queue-orchestrator # bootstrap migrations + NodePool (dispatch/outbox/reclaim/input) queue-claim-worker --queue=gpu # one worker process; --queue ∈ {cpu,gpu} (DAG) ∪ config.ingest_queues (default {fetch,load}) queue-scheduler # PG-native ingest ticker +queue-worker-control --queue=gpu --off # operator ON/OFF for a (host,queue) worker (migration 0012; docs/worker_control.md) ``` There is no linter/formatter config and no CI in this repo; match the surrounding style (heavy module/function docstrings explaining *why*, `from __future__ import annotations` everywhere). @@ -54,7 +59,7 @@ This codebase is built test-first, and the test suite is the spec. **Write the f All three run as separate processes against the same database; the DB *is* the message bus. 1. **Orchestrator** (`orchestrator.py` → `node_pool.NodePool`) — the only process that bootstraps migrations. Its `NodePool` runs background threads: the **dispatch loop** (`_tick`) expands freshly-`queued` `mode='node'` runs into node-jobs via `dispatcher.start_run`, **drains the dispatch-event outbox**, and runs the **lease-reclaim sweeps** (node + ingest); plus an **`InputListener`** that polls `workflow_input_submissions` and resumes parked input nodes. No node bodies run here. -2. **Claim worker** (`claim_worker.ClaimWorker`) — **one process == one worker, concurrency-1 by contract.** `run_forever` does `LISTEN ` then drains the queue greedily on each wake (1 s safety poll covers a dropped NOTIFY). `cpu`/`gpu` draw DAG node-jobs from `workflow_node_jobs`; the **ingest-family** queues (`config.ingest_queues`, default `fetch`/`load`) draw standalone ingest jobs from `ingest_jobs`. The GPU worker owns the process-wide warm `ModelCache`. Every claimed job is bracketed by a `LeaseRenewer` + a wall-clock `Watchdog` — and, for a GPU node that reports per-step progress, a no-progress `StallWatchdog` (node-jobs add a run-cancel watcher). See *Lease + reclaim + watchdog* below. +2. **Claim worker** (`claim_worker.ClaimWorker`) — **one process == one worker, concurrency-1 by contract.** `run_forever` does `LISTEN ` then drains the queue greedily on each wake (1 s safety poll covers a dropped NOTIFY). `cpu`/`gpu` draw DAG node-jobs from `workflow_node_jobs`; the **ingest-family** queues (`config.ingest_queues`, default `fetch`/`load`) draw standalone ingest jobs from `ingest_jobs`. The GPU worker owns the process-wide warm `ModelCache` (cache logic lives in `model_cache.py`, DB-decoupled; `gpu_model_cache.py` wires the one process-wide instance and publishes `current_model` to `worker_heartbeats` for affinity routing). Every claimed job is bracketed by a `LeaseRenewer` + a wall-clock `Watchdog`; a GPU node additionally gets the health-driven `GpuHealthWatchdog` (and a `StallWatchdog` for non-video). A node-job also runs a `JobStatusWatcher` (run-cancel/reassign self-kill), and the worker process runs one `WorkerControlWatcher` (operator ON/OFF). See *Lease + reclaim + watchdog* below. 3. **Scheduler** (`scheduler.Ticker`) — a Python loop (not pg_cron) that sleeps to the next scheduled minute and enqueues `ingest_jobs` rows; an ingest claim worker picks them up. ### The queue mechanism @@ -65,15 +70,26 @@ All three run as separate processes against the same database; the DB *is* the m A live worker renews `lease_expires_at` (~every 10 s) while a job runs, so lease length is independent of job duration. A **dead/wedged** worker stops renewing → its lease lapses → the orchestrator's reclaim sweep flips the row back to `queued` (re-firing the NOTIFY); this sweep is the **sole** recovery path for an orphaned `running` row. -**Two daemon watchdogs** bracket every claimed job (both in `claim_worker.py`, both funnelling their terminal action through `_fail_job_and_exit` so the outbox-atomicity contract — mark failed **and** write the `failed` dispatch event in one txn — is coded in exactly one place): +**Three daemon watchdogs** can bracket a claimed job (all in `claim_worker.py`), each hard-exiting with a **distinct code** so the cause is readable from the exit status: -- the wall-clock **`Watchdog`** trips on `elapsed ≥ budget_for(job)` and hard-exits `os._exit(75)`; -- the no-progress **`StallWatchdog`** is *opt-in* (a non-video GPU node whose `run(...)` declares a `status_callback`), **inert until the first per-step `beat()` arms it after the model load** (so a minutes-long cold load is never policed), then trips on a beat gap ≥ `STALL_TIMEOUT_S` (120 s) and exits `76`. +- the wall-clock **`Watchdog`** (exit `75`) trips on `elapsed ≥ budget_for(job)`; +- the no-progress **`StallWatchdog`** (exit `76`) is *opt-in* (a non-video GPU node whose `run(...)` declares a `status_callback`), **inert until the first per-step `beat()` arms it after the model load** (so a minutes-long cold load is never policed), then trips on a beat gap ≥ `STALL_TIMEOUT_S` (120 s); +- the **`GpuHealthWatchdog`** (exit `78`) is the GPU guard — **health-driven, not wall-clock**: every `interval_s` (default 300 s) it trips only when the per-container GPU stayed idle **and** RAM was static across the window (no GPU work *and* no memory movement ⇒ wedged), arming at job start with a 20-min `load_grace_s` first window (safe because a healthy load *moves* RAM and a healthy render *keeps the GPU busy*). It replaced the old fixed GPU wall-clock cap, which couldn't catch the Blackwell qwen 0 %-GPU stall yet false-killed long-but-healthy renders. Per-container GPU%/RAM samplers (`gpu_health.py`) use pmon `sm%` scoped to this container (excludes a co-tenant ollama sidecar) + cgroup RAM. -Either trip lets the lease lapse so reclaim re-queues the work — which is why a GPU worker is one process holding one model: a hard exit kills exactly the hung job. See **`docs/watchdogs.md`** for the full design (why a wall-clock budget alone can't catch the Blackwell qwen 0 %-GPU stall, and the per-step beat plumbing). +All three trips funnel through one policy point — **`_watchdog_trip`**, which does **re-queue-and-retry, not fail** (migration `0010`): for a DAG node-job it flips the row `running → queued`, bumps `watchdog_retries`, and writes **no** dispatch event (so the *run* stays `running` — only this node re-runs; `_requeue_job_and_exit`), then hard-exits so a fresh worker re-claims it. Only once `watchdog_retries` hits `AI_LEADS_WATCHDOG_MAX_RETRIES` (default 3) does it fall back to **`_fail_job_and_exit`** (mark failed **+** the `failed` dispatch event in one txn — the outbox-atomicity contract, coded in exactly one place). An **ingest** job (no run, no retry counter) always takes the fail path. So a single transient wedge no longer kills the whole workflow — and a GPU worker stays one process holding one model: a hard exit kills exactly the hung job. See **`docs/watchdogs.md`** for the full design. + +Two state-watcher threads sit alongside the watchdogs: **`JobStatusWatcher`** (exit `77`) self-kills a worker whose `running` row was cancelled/reassigned out from under it (avoids a double-run), and **`WorkerControlWatcher`** (exit `79`) enforces the operator ON/OFF control plane (below). **Last-resort recovery — the orchestrator-side dead-worker detector.** Every watchdog above is an in-process *thread*; a GPU **hardware-hang** can defeat all of them (the trip signal becomes unobservable from inside — e.g. on ROCm the box-level GPU probe still reads non-idle while *this* render is wedged — or, on a GIL-holding hang, the threads can't run at all). The worker then sits wedged while its `worker_heartbeats.last_seen` freezes. The orchestrator is a **separate process** (GIL-independent of the worker), so `NodePool._tick` adds `_sweep_dead_workers` → `node_queue.flag_stale_workers_holding_running_jobs`: it flags any worker whose heartbeat is stale (>30 s, 3× the cadence) **while it still owns a `running` job** (join `claimed_by = host_label`), stamping `worker_heartbeats.last_flagged_dead_at` (migration 0009) + an actionable `DEAD WORKER:` ERROR. The JOB is recovered by the lease-reclaim as usual; this flags the dead **process** for a host-supervisor to bounce (the orchestrator can't safely cross-host-kill it). A fresh heartbeat clears the flag. See **`docs/watchdogs.md` → "last-resort layer"** for the root-cause and the host-supervisor hook. +### Durable node-event history (migration 0011) + +`workflow_node_jobs` is one *mutable* row per `(run_id, node_id)` — a watchdog re-queue overwrites `claimed_by`/timing and only bumps `watchdog_retries`, so the prior attempt's worker, timing, and trip reason are lost. `workflow_node_events` is an **append-only** forensic log of the per-attempt lifecycle (`claimed`, `model_load_*`, `progress_beat`, `stall_*`, `gpu_health_trip`, `budget_trip`, `requeued`, `reassigned`, terminal …; `attempt` = `watchdog_retries` at emit ties the tries of one node together). Writers in `node_queue`: `record_node_event` (best-effort — own connection, swallow-on-failure, so an event blip can **never** fail the load-bearing claim/terminal/watchdog path) and `record_node_event_in_txn` (terminal + `requeued` events ride the **same txn** as the state change — the dispatch-outbox atomicity pattern). The worker emits via `claim_worker._emit_node_event`; `NodePool` prunes old rows on a sweep (`prune_node_events`, default 30-day retention). **Append-only — no UPDATE path**, so it adds no new mutation invariant. + +### Operator worker ON/OFF control plane (migration 0012) + +`worker_controls` is **desired** state (an operator or host UI writes a `(host_label, queue)` row), kept deliberately separate from the *observed* `worker_heartbeats` — an OFF state must persist precisely while the worker isn't beating. A row trigger fires `pg_notify('worker_control', ':')` inside the writer's txn, so a plain SQL write from any DB consumer wakes the worker with no app-side NOTIFY. `worker_control.WorkerControlWatcher` (LISTEN + 5 s safety poll) enforces it; on OFF it dispatches the row's `stop_policy` through the `STOP_POLICIES` registry — only `"hard"` exists today (`"drain"`/`"pause"` are reserved names that slot in with no schema change, which is why `stop_policy` is free-form TEXT, not a CHECK). **Hard stop = process exit (`os._exit(79)`)**: the node body runs inline on the worker's main thread, so killing the process is the only thing that reliably stops a wedged CUDA kernel and frees VRAM (it re-queues the in-flight job first, resume-style, with **no** `watchdog_retries` bump — an operator stop isn't a fault). The supervisor restarts the container; on boot the worker re-reads `worker_controls` and **parks** (idle, not claiming) while still OFF. A worker absent from the table — or a DB predating 0012 (`get_worker_control` swallows `UndefinedTable`) — is treated as **ON**, so the engine runs unchanged before 0012 (claim workers gate on schema 6/8, not 12). See `docs/worker_control.md`. + ### DAG dispatch + the durable outbox (key decoupling) `dispatcher.py` is **pure DAG-walk logic** (unit-testable without a worker pool): expand a run's initial nodes, and on each node terminal event find downstream nodes whose deps are all `completed`/`skipped` and enqueue (or insert a `skipped` marker per `skip_if`). The worker→dispatcher handoff is an **outbox**: when a worker finalizes a node it writes the terminal status **and** a `workflow_dispatch_events` row in **one transaction** (`node_executor.execute_node`). The orchestrator drains that outbox and calls `on_node_completed`/`on_node_failed`/`on_node_awaiting_input`. So fan-out is retryable and never synchronously coupled to the worker; a failing callback is retried next tick (and poison-flagged after `_DISPATCH_MAX_ATTEMPTS`). @@ -100,15 +116,35 @@ Everything domain-specific is an **injected hook** on a process-wide `EngineConf **Every hook has a safe default**, so `import queue_workflows` + `configure()` + a reachable Postgres runs standalone. When working in any engine module, never reach "up" into a host — add a config hook with a default instead. `config.py` is a **leaf** (imports nothing from other engine modules) to keep the dependency graph acyclic; respect that (e.g. it lazily imports `refs` only inside `get_resolve_ref`). +### Pluggable storage backends (the `db_backend` seam — additive, v0.3.0) + +Beyond the host hooks, the **storage layer itself** is selectable: +`configure(db_backend="pg"|"redis"|"mongodb")` resolves a `StorageBackend` +(`queue_workflows/backends/`, **one provider per file**) — a generic durable-queue +SPI (enqueue / claim-exactly-once / lease+reclaim / idempotent terminals / the +**atomic outbox** `complete_with_event`/`fail_with_event` / wake / heartbeat / +ON-OFF control). It is **additive and opt-in**: `pg` (default) is byte-compatible +and the legacy engine modules still talk to Postgres directly — selecting +redis/mongo does **not** re-home the orchestrator/worker (a later milestone), and +the redis/pymongo drivers import lazily so a pg-only deploy needs neither. Two +invariants make it honest: the port leaks **no** driver object (no +cursor/pipeline/session in any signature — the anti-leakage rule), and every +backend is **namespace-bound** so two tenants on one redis/mongo server can't see +each other's jobs. pg uses `SKIP LOCKED`/`RETURNING`/one-txn-outbox/`LISTEN`; +redis uses **Lua** (atomic claim+terminal+event) + pub/sub; mongodb uses +`find_one_and_update` + a **multi-doc txn** + a change stream (**replica set +required**). The contract is one parametrized suite (`tests/test_backend_contract.py`) +green against all three live servers. See `docs/storage_backends.md`. + ### Migrations — the engine owns one chain, hosts run a second The engine owns `queue_workflows/migrations/NNNN_*.sql` (+ paired `.down.sql`), shipped as package data, tracked in the `queue_schema_version` ledger. `db.bootstrap()` applies the chain idempotently; `db.downgrade()` reverses it. A host with its own domain tables runs a **second** chain via `db.bootstrap(migrations_dir=..., version_table=...)` against its own ledger — "two ORMs / two chains, one Postgres." **Only the orchestrator bootstraps** (`db.bootstrap` takes no advisory lock); claim workers call `db.wait_for_schema(min_version)` and block until the schema is ready rather than racing the migration run (`_REQUIRED_SCHEMA_VERSION` maps each queue to its minimum version). -The chain: `0001` `workflow_runs` → `0002` `workflow_node_jobs` → `0003` `workflow_input_submissions` → `0004` `workflow_dispatch_events` → `0005` `worker_heartbeats` → `0006` lease columns + `node_job_ready` trigger → `0007` `ingest_jobs` + `ingest_job_ready` trigger → `0008` multi-tenant ingest (adds per-job `args JSONB`; drops the `fetch`/`load` queue CHECK and the `cpu`/`gpu`-only `worker_heartbeats` CHECK so those allow-lists move host-side — all additive/idempotent). Ingest queues therefore require schema version ≥ 8. `run_store` treats `parcel_id` as an opaque nullable column (the engine drops the host's parcels FK) so the engine never knows the host's domain. +The chain: `0001` `workflow_runs` → `0002` `workflow_node_jobs` → `0003` `workflow_input_submissions` → `0004` `workflow_dispatch_events` → `0005` `worker_heartbeats` → `0006` lease columns + `node_job_ready` trigger → `0007` `ingest_jobs` + `ingest_job_ready` trigger → `0008` multi-tenant ingest (adds per-job `args JSONB`; drops the `fetch`/`load` queue CHECK and the `cpu`/`gpu`-only `worker_heartbeats` CHECK so those allow-lists move host-side — all additive/idempotent) → `0009` `worker_heartbeats.last_flagged_dead_at` (dead-worker flag) → `0010` `workflow_node_jobs.watchdog_retries` (watchdog re-queue counter) → `0011` `workflow_node_events` (append-only per-attempt event log) → `0012` `worker_controls` (+ `worker_control` NOTIFY trigger). Ingest queues require schema version ≥ 8; worker-control is read-optional — claim workers gate on 6/8, not 12, and treat a pre-0012 DB as all-ON. `run_store` treats `parcel_id` as an opaque nullable column (the engine drops the host's parcels FK) so the engine never knows the host's domain. ### Idempotency contracts to preserve -`mark_completed`/`mark_failed`/`mark_awaiting_input` (and the ingest twins) all `UPDATE … WHERE status NOT IN ('completed','failed','cancelled') RETURNING *` and return `None` when the row was already terminal. This `WHERE` is load-bearing: it makes duplicate deliveries and claim-race losers safe, and stops a stray second call from clobbering a finalized `context_delta`. JSON columns are pre-validated (`json.dumps`) before any state mutation so a bad payload fails before the write. Keep this shape for any new state transition. +`mark_completed`/`mark_failed`/`mark_awaiting_input` (and the ingest twins) all `UPDATE … WHERE status NOT IN ('completed','failed','cancelled') RETURNING *` and return `None` when the row was already terminal. This `WHERE` is load-bearing: it makes duplicate deliveries and claim-race losers safe, and stops a stray second call from clobbering a finalized `context_delta`. JSON columns are pre-validated (`json.dumps`) before any state mutation so a bad payload fails before the write. Keep this shape for any new state transition. (The one deliberate exception is `workflow_node_events` — **append-only, no UPDATE path** — whose terminal/`requeued` rows instead ride the state-change txn, like the dispatch outbox.) ### Telemetry (hw_metrics + cgroup attribution) diff --git a/CHANGELOG.md b/CHANGELOG.md index c91b639..0dc2088 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.3.0] — 2026-05-27 ### Added +- **Pluggable storage backends** — a `StorageBackend` SPI + (`queue_workflows.backends`) makes the durable-queue store selectable via + `configure(db_backend="pg"|"redis"|"mongodb")`, one provider per file. `pg` + (default) is byte-compatible and unchanged; `redis` (atomic claim/terminal via + Lua + pub/sub wake) and `mongodb` (`find_one_and_update` + multi-doc-txn outbox + + change-stream wake, replica set) reproduce the same contract — claim + exactly-once, lease/reclaim, idempotent terminals, the atomic outbox, and + per-namespace tenant isolation — pinned by a parametrized contract suite run + against all three live servers. Optional drivers: `queue_workflows[redis]` / + `[mongodb]`. See `docs/storage_backends.md`. - Operator worker ON/OFF control plane: a `worker_controls` table + a per-`(host, queue)` `WorkerControlWatcher` that hard-stops (kill in-flight + free RAM/VRAM) or parks a worker on command (migration 0012). See diff --git a/CLAUDE.md b/CLAUDE.md index 9ba2e34..8952b74 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## What this is -`queue_workflows` is a standalone, pip-installable **Postgres-as-queue workflow engine**: a `SELECT … FOR UPDATE SKIP LOCKED` claim loop woken by `LISTEN`, lease reclaim, a DAG dispatcher with a durable outbox, a GPU warm-model cache, periodic ingest work, per-host hw-metrics telemetry, and an operator worker ON/OFF control plane (`worker_control` — hard-stop/park a `(host, queue)` worker; see `docs/worker_control.md`). **Postgres (via `psycopg` 3) is the only hard runtime dependency.** +`queue_workflows` is a standalone, pip-installable **Postgres-as-queue workflow engine**: a `SELECT … FOR UPDATE SKIP LOCKED` claim loop woken by `LISTEN`, lease reclaim, a DAG dispatcher with a durable outbox, a GPU warm-model cache, periodic ingest work, per-host hw-metrics telemetry, a durable per-node event log, and an operator worker ON/OFF control plane (`worker_control` — hard-stop/park a `(host, queue)` worker; see `docs/worker_control.md` and `docs/watchdogs.md`). **Postgres (via `psycopg` 3) is the only hard runtime dependency.** ## Who uses it / why the `ai_leads` defaults @@ -22,6 +22,10 @@ pip install -e '.[test]' # add [metrics] for the psutil-based hw_metrics # creates it if missing (see tests/conftest.py). QUEUE_WORKFLOWS_TEST_DB_URL=postgresql://user:pw@host:port/queue_workflows_test python -m pytest # (falls back to AI_LEADS_DB_URL with its db-name suffixed _test if the above is unset) +# The multi-backend contract suite (tests/test_backend_contract.py) also reads +# QUEUE_WORKFLOWS_TEST_REDIS_URL / _MONGO_URL (mongo needs a replica set); each +# backend SKIPS if its server is unset/unreachable. `.[test]` pulls redis + +# pymongo; standalone consumers add `.[redis]` / `.[mongodb]`. See docs/storage_backends.md. python -m pytest tests/test_node_queue.py # one module python -m pytest tests/test_node_queue.py::test_name # one test @@ -31,6 +35,7 @@ python -m pytest -k lease # by keyword queue-orchestrator # bootstrap migrations + NodePool (dispatch/outbox/reclaim/input) queue-claim-worker --queue=gpu # one worker process; --queue ∈ {cpu,gpu} (DAG) ∪ config.ingest_queues (default {fetch,load}) queue-scheduler # PG-native ingest ticker +queue-worker-control --queue=gpu --off # operator ON/OFF for a (host,queue) worker (migration 0012; docs/worker_control.md) ``` There is no linter/formatter config and no CI in this repo; match the surrounding style (heavy module/function docstrings explaining *why*, `from __future__ import annotations` everywhere). @@ -54,7 +59,7 @@ This codebase is built test-first, and the test suite is the spec. **Write the f All three run as separate processes against the same database; the DB *is* the message bus. 1. **Orchestrator** (`orchestrator.py` → `node_pool.NodePool`) — the only process that bootstraps migrations. Its `NodePool` runs background threads: the **dispatch loop** (`_tick`) expands freshly-`queued` `mode='node'` runs into node-jobs via `dispatcher.start_run`, **drains the dispatch-event outbox**, and runs the **lease-reclaim sweeps** (node + ingest); plus an **`InputListener`** that polls `workflow_input_submissions` and resumes parked input nodes. No node bodies run here. -2. **Claim worker** (`claim_worker.ClaimWorker`) — **one process == one worker, concurrency-1 by contract.** `run_forever` does `LISTEN ` then drains the queue greedily on each wake (1 s safety poll covers a dropped NOTIFY). `cpu`/`gpu` draw DAG node-jobs from `workflow_node_jobs`; the **ingest-family** queues (`config.ingest_queues`, default `fetch`/`load`) draw standalone ingest jobs from `ingest_jobs`. The GPU worker owns the process-wide warm `ModelCache`. Every claimed job is bracketed by a `LeaseRenewer` + a wall-clock `Watchdog` — and, for a GPU node that reports per-step progress, a no-progress `StallWatchdog` (node-jobs add a run-cancel watcher). See *Lease + reclaim + watchdog* below. +2. **Claim worker** (`claim_worker.ClaimWorker`) — **one process == one worker, concurrency-1 by contract.** `run_forever` does `LISTEN ` then drains the queue greedily on each wake (1 s safety poll covers a dropped NOTIFY). `cpu`/`gpu` draw DAG node-jobs from `workflow_node_jobs`; the **ingest-family** queues (`config.ingest_queues`, default `fetch`/`load`) draw standalone ingest jobs from `ingest_jobs`. The GPU worker owns the process-wide warm `ModelCache` (cache logic lives in `model_cache.py`, DB-decoupled; `gpu_model_cache.py` wires the one process-wide instance and publishes `current_model` to `worker_heartbeats` for affinity routing). Every claimed job is bracketed by a `LeaseRenewer` + a wall-clock `Watchdog`; a GPU node additionally gets the health-driven `GpuHealthWatchdog` (and a `StallWatchdog` for non-video). A node-job also runs a `JobStatusWatcher` (run-cancel/reassign self-kill), and the worker process runs one `WorkerControlWatcher` (operator ON/OFF). See *Lease + reclaim + watchdog* below. 3. **Scheduler** (`scheduler.Ticker`) — a Python loop (not pg_cron) that sleeps to the next scheduled minute and enqueues `ingest_jobs` rows; an ingest claim worker picks them up. ### The queue mechanism @@ -65,15 +70,26 @@ All three run as separate processes against the same database; the DB *is* the m A live worker renews `lease_expires_at` (~every 10 s) while a job runs, so lease length is independent of job duration. A **dead/wedged** worker stops renewing → its lease lapses → the orchestrator's reclaim sweep flips the row back to `queued` (re-firing the NOTIFY); this sweep is the **sole** recovery path for an orphaned `running` row. -**Two daemon watchdogs** bracket every claimed job (both in `claim_worker.py`, both funnelling their terminal action through `_fail_job_and_exit` so the outbox-atomicity contract — mark failed **and** write the `failed` dispatch event in one txn — is coded in exactly one place): +**Three daemon watchdogs** can bracket a claimed job (all in `claim_worker.py`), each hard-exiting with a **distinct code** so the cause is readable from the exit status: -- the wall-clock **`Watchdog`** trips on `elapsed ≥ budget_for(job)` and hard-exits `os._exit(75)`; -- the no-progress **`StallWatchdog`** is *opt-in* (a non-video GPU node whose `run(...)` declares a `status_callback`), **inert until the first per-step `beat()` arms it after the model load** (so a minutes-long cold load is never policed), then trips on a beat gap ≥ `STALL_TIMEOUT_S` (120 s) and exits `76`. +- the wall-clock **`Watchdog`** (exit `75`) trips on `elapsed ≥ budget_for(job)`; +- the no-progress **`StallWatchdog`** (exit `76`) is *opt-in* (a non-video GPU node whose `run(...)` declares a `status_callback`), **inert until the first per-step `beat()` arms it after the model load** (so a minutes-long cold load is never policed), then trips on a beat gap ≥ `STALL_TIMEOUT_S` (120 s); +- the **`GpuHealthWatchdog`** (exit `78`) is the GPU guard — **health-driven, not wall-clock**: every `interval_s` (default 300 s) it trips only when the per-container GPU stayed idle **and** RAM was static across the window (no GPU work *and* no memory movement ⇒ wedged), arming at job start with a 20-min `load_grace_s` first window (safe because a healthy load *moves* RAM and a healthy render *keeps the GPU busy*). It replaced the old fixed GPU wall-clock cap, which couldn't catch the Blackwell qwen 0 %-GPU stall yet false-killed long-but-healthy renders. Per-container GPU%/RAM samplers (`gpu_health.py`) use pmon `sm%` scoped to this container (excludes a co-tenant ollama sidecar) + cgroup RAM. -Either trip lets the lease lapse so reclaim re-queues the work — which is why a GPU worker is one process holding one model: a hard exit kills exactly the hung job. See **`docs/watchdogs.md`** for the full design (why a wall-clock budget alone can't catch the Blackwell qwen 0 %-GPU stall, and the per-step beat plumbing). +All three trips funnel through one policy point — **`_watchdog_trip`**, which does **re-queue-and-retry, not fail** (migration `0010`): for a DAG node-job it flips the row `running → queued`, bumps `watchdog_retries`, and writes **no** dispatch event (so the *run* stays `running` — only this node re-runs; `_requeue_job_and_exit`), then hard-exits so a fresh worker re-claims it. Only once `watchdog_retries` hits `AI_LEADS_WATCHDOG_MAX_RETRIES` (default 3) does it fall back to **`_fail_job_and_exit`** (mark failed **+** the `failed` dispatch event in one txn — the outbox-atomicity contract, coded in exactly one place). An **ingest** job (no run, no retry counter) always takes the fail path. So a single transient wedge no longer kills the whole workflow — and a GPU worker stays one process holding one model: a hard exit kills exactly the hung job. See **`docs/watchdogs.md`** for the full design. + +Two state-watcher threads sit alongside the watchdogs: **`JobStatusWatcher`** (exit `77`) self-kills a worker whose `running` row was cancelled/reassigned out from under it (avoids a double-run), and **`WorkerControlWatcher`** (exit `79`) enforces the operator ON/OFF control plane (below). **Last-resort recovery — the orchestrator-side dead-worker detector.** Every watchdog above is an in-process *thread*; a GPU **hardware-hang** can defeat all of them (the trip signal becomes unobservable from inside — e.g. on ROCm the box-level GPU probe still reads non-idle while *this* render is wedged — or, on a GIL-holding hang, the threads can't run at all). The worker then sits wedged while its `worker_heartbeats.last_seen` freezes. The orchestrator is a **separate process** (GIL-independent of the worker), so `NodePool._tick` adds `_sweep_dead_workers` → `node_queue.flag_stale_workers_holding_running_jobs`: it flags any worker whose heartbeat is stale (>30 s, 3× the cadence) **while it still owns a `running` job** (join `claimed_by = host_label`), stamping `worker_heartbeats.last_flagged_dead_at` (migration 0009) + an actionable `DEAD WORKER:` ERROR. The JOB is recovered by the lease-reclaim as usual; this flags the dead **process** for a host-supervisor to bounce (the orchestrator can't safely cross-host-kill it). A fresh heartbeat clears the flag. See **`docs/watchdogs.md` → "last-resort layer"** for the root-cause and the host-supervisor hook. +### Durable node-event history (migration 0011) + +`workflow_node_jobs` is one *mutable* row per `(run_id, node_id)` — a watchdog re-queue overwrites `claimed_by`/timing and only bumps `watchdog_retries`, so the prior attempt's worker, timing, and trip reason are lost. `workflow_node_events` is an **append-only** forensic log of the per-attempt lifecycle (`claimed`, `model_load_*`, `progress_beat`, `stall_*`, `gpu_health_trip`, `budget_trip`, `requeued`, `reassigned`, terminal …; `attempt` = `watchdog_retries` at emit ties the tries of one node together). Writers in `node_queue`: `record_node_event` (best-effort — own connection, swallow-on-failure, so an event blip can **never** fail the load-bearing claim/terminal/watchdog path) and `record_node_event_in_txn` (terminal + `requeued` events ride the **same txn** as the state change — the dispatch-outbox atomicity pattern). The worker emits via `claim_worker._emit_node_event`; `NodePool` prunes old rows on a sweep (`prune_node_events`, default 30-day retention). **Append-only — no UPDATE path**, so it adds no new mutation invariant. + +### Operator worker ON/OFF control plane (migration 0012) + +`worker_controls` is **desired** state (an operator or host UI writes a `(host_label, queue)` row), kept deliberately separate from the *observed* `worker_heartbeats` — an OFF state must persist precisely while the worker isn't beating. A row trigger fires `pg_notify('worker_control', ':')` inside the writer's txn, so a plain SQL write from any DB consumer wakes the worker with no app-side NOTIFY. `worker_control.WorkerControlWatcher` (LISTEN + 5 s safety poll) enforces it; on OFF it dispatches the row's `stop_policy` through the `STOP_POLICIES` registry — only `"hard"` exists today (`"drain"`/`"pause"` are reserved names that slot in with no schema change, which is why `stop_policy` is free-form TEXT, not a CHECK). **Hard stop = process exit (`os._exit(79)`)**: the node body runs inline on the worker's main thread, so killing the process is the only thing that reliably stops a wedged CUDA kernel and frees VRAM (it re-queues the in-flight job first, resume-style, with **no** `watchdog_retries` bump — an operator stop isn't a fault). The supervisor restarts the container; on boot the worker re-reads `worker_controls` and **parks** (idle, not claiming) while still OFF. A worker absent from the table — or a DB predating 0012 (`get_worker_control` swallows `UndefinedTable`) — is treated as **ON**, so the engine runs unchanged before 0012 (claim workers gate on schema 6/8, not 12). See `docs/worker_control.md`. + ### DAG dispatch + the durable outbox (key decoupling) `dispatcher.py` is **pure DAG-walk logic** (unit-testable without a worker pool): expand a run's initial nodes, and on each node terminal event find downstream nodes whose deps are all `completed`/`skipped` and enqueue (or insert a `skipped` marker per `skip_if`). The worker→dispatcher handoff is an **outbox**: when a worker finalizes a node it writes the terminal status **and** a `workflow_dispatch_events` row in **one transaction** (`node_executor.execute_node`). The orchestrator drains that outbox and calls `on_node_completed`/`on_node_failed`/`on_node_awaiting_input`. So fan-out is retryable and never synchronously coupled to the worker; a failing callback is retried next tick (and poison-flagged after `_DISPATCH_MAX_ATTEMPTS`). @@ -100,15 +116,35 @@ Everything domain-specific is an **injected hook** on a process-wide `EngineConf **Every hook has a safe default**, so `import queue_workflows` + `configure()` + a reachable Postgres runs standalone. When working in any engine module, never reach "up" into a host — add a config hook with a default instead. `config.py` is a **leaf** (imports nothing from other engine modules) to keep the dependency graph acyclic; respect that (e.g. it lazily imports `refs` only inside `get_resolve_ref`). +### Pluggable storage backends (the `db_backend` seam — additive, v0.3.0) + +Beyond the host hooks, the **storage layer itself** is selectable: +`configure(db_backend="pg"|"redis"|"mongodb")` resolves a `StorageBackend` +(`queue_workflows/backends/`, **one provider per file**) — a generic durable-queue +SPI (enqueue / claim-exactly-once / lease+reclaim / idempotent terminals / the +**atomic outbox** `complete_with_event`/`fail_with_event` / wake / heartbeat / +ON-OFF control). It is **additive and opt-in**: `pg` (default) is byte-compatible +and the legacy engine modules still talk to Postgres directly — selecting +redis/mongo does **not** re-home the orchestrator/worker (a later milestone), and +the redis/pymongo drivers import lazily so a pg-only deploy needs neither. Two +invariants make it honest: the port leaks **no** driver object (no +cursor/pipeline/session in any signature — the anti-leakage rule), and every +backend is **namespace-bound** so two tenants on one redis/mongo server can't see +each other's jobs. pg uses `SKIP LOCKED`/`RETURNING`/one-txn-outbox/`LISTEN`; +redis uses **Lua** (atomic claim+terminal+event) + pub/sub; mongodb uses +`find_one_and_update` + a **multi-doc txn** + a change stream (**replica set +required**). The contract is one parametrized suite (`tests/test_backend_contract.py`) +green against all three live servers. See `docs/storage_backends.md`. + ### Migrations — the engine owns one chain, hosts run a second The engine owns `queue_workflows/migrations/NNNN_*.sql` (+ paired `.down.sql`), shipped as package data, tracked in the `queue_schema_version` ledger. `db.bootstrap()` applies the chain idempotently; `db.downgrade()` reverses it. A host with its own domain tables runs a **second** chain via `db.bootstrap(migrations_dir=..., version_table=...)` against its own ledger — "two ORMs / two chains, one Postgres." **Only the orchestrator bootstraps** (`db.bootstrap` takes no advisory lock); claim workers call `db.wait_for_schema(min_version)` and block until the schema is ready rather than racing the migration run (`_REQUIRED_SCHEMA_VERSION` maps each queue to its minimum version). -The chain: `0001` `workflow_runs` → `0002` `workflow_node_jobs` → `0003` `workflow_input_submissions` → `0004` `workflow_dispatch_events` → `0005` `worker_heartbeats` → `0006` lease columns + `node_job_ready` trigger → `0007` `ingest_jobs` + `ingest_job_ready` trigger → `0008` multi-tenant ingest (adds per-job `args JSONB`; drops the `fetch`/`load` queue CHECK and the `cpu`/`gpu`-only `worker_heartbeats` CHECK so those allow-lists move host-side — all additive/idempotent). Ingest queues therefore require schema version ≥ 8. `run_store` treats `parcel_id` as an opaque nullable column (the engine drops the host's parcels FK) so the engine never knows the host's domain. +The chain: `0001` `workflow_runs` → `0002` `workflow_node_jobs` → `0003` `workflow_input_submissions` → `0004` `workflow_dispatch_events` → `0005` `worker_heartbeats` → `0006` lease columns + `node_job_ready` trigger → `0007` `ingest_jobs` + `ingest_job_ready` trigger → `0008` multi-tenant ingest (adds per-job `args JSONB`; drops the `fetch`/`load` queue CHECK and the `cpu`/`gpu`-only `worker_heartbeats` CHECK so those allow-lists move host-side — all additive/idempotent) → `0009` `worker_heartbeats.last_flagged_dead_at` (dead-worker flag) → `0010` `workflow_node_jobs.watchdog_retries` (watchdog re-queue counter) → `0011` `workflow_node_events` (append-only per-attempt event log) → `0012` `worker_controls` (+ `worker_control` NOTIFY trigger). Ingest queues require schema version ≥ 8; worker-control is read-optional — claim workers gate on 6/8, not 12, and treat a pre-0012 DB as all-ON. `run_store` treats `parcel_id` as an opaque nullable column (the engine drops the host's parcels FK) so the engine never knows the host's domain. ### Idempotency contracts to preserve -`mark_completed`/`mark_failed`/`mark_awaiting_input` (and the ingest twins) all `UPDATE … WHERE status NOT IN ('completed','failed','cancelled') RETURNING *` and return `None` when the row was already terminal. This `WHERE` is load-bearing: it makes duplicate deliveries and claim-race losers safe, and stops a stray second call from clobbering a finalized `context_delta`. JSON columns are pre-validated (`json.dumps`) before any state mutation so a bad payload fails before the write. Keep this shape for any new state transition. +`mark_completed`/`mark_failed`/`mark_awaiting_input` (and the ingest twins) all `UPDATE … WHERE status NOT IN ('completed','failed','cancelled') RETURNING *` and return `None` when the row was already terminal. This `WHERE` is load-bearing: it makes duplicate deliveries and claim-race losers safe, and stops a stray second call from clobbering a finalized `context_delta`. JSON columns are pre-validated (`json.dumps`) before any state mutation so a bad payload fails before the write. Keep this shape for any new state transition. (The one deliberate exception is `workflow_node_events` — **append-only, no UPDATE path** — whose terminal/`requeued` rows instead ride the state-change txn, like the dispatch outbox.) ### Telemetry (hw_metrics + cgroup attribution) diff --git a/README.md b/README.md index 9806099..b84e635 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,9 @@ rationale. detector (last-resort recovery from a GPU hardware hang). - [`docs/worker_control.md`](docs/worker_control.md) — the operator ON/OFF control plane (hard-stop vs park a `(host, queue)` worker). +- [`docs/storage_backends.md`](docs/storage_backends.md) — the pluggable + `StorageBackend` SPI (`pg` / `redis` / `mongodb`): the contract, the + capability matrix, and the integration boundary. ## Quick start @@ -208,6 +211,29 @@ worker absent from `worker_controls` is treated as ON. See [`docs/worker_control.md`](docs/worker_control.md) for the full design (the `os._exit(79)` hard-stop contract and the extensible stop-policy seam). +## Pluggable storage backends (`pg` · `redis` · `mongodb`) + +The queue store is selectable (since v0.3.0). Postgres is the default and the +reference path; `redis` and `mongodb` are **opt-in** providers that reproduce the +same durable-queue contract — claim exactly-once, lease/reclaim, idempotent +terminals, an **atomic outbox**, and per-namespace tenant isolation. + +```python +import queue_workflows +queue_workflows.configure(db_backend="redis") # or "mongodb" / "pg" (default) +from queue_workflows.backends import get_backend +be = get_backend() +job_id = be.enqueue("cpu", {"task": "render"}) +job = be.claim("cpu", worker="box-1", lease_s=30) +be.complete_with_event(job["id"], "completed", result={"ok": True}) +``` + +`pip install 'queue_workflows[redis]'` / `'[mongodb]'` for the optional drivers +(mongo needs a replica set). This is **additive** — it does not move the existing +orchestrator/worker off Postgres. See +[`docs/storage_backends.md`](docs/storage_backends.md) for the design, the +capability matrix (and its caveats), and the integration boundary. + ## Tests ``` @@ -217,4 +243,6 @@ QUEUE_WORKFLOWS_TEST_DB_URL=postgresql://user:pw@host:port/queue_workflows_test ``` The suite forces a `*_test` DB and applies the engine migration chain only. See -`tests/conftest.py`. +`tests/conftest.py`. The multi-backend contract suite additionally reads +`QUEUE_WORKFLOWS_TEST_REDIS_URL` / `QUEUE_WORKFLOWS_TEST_MONGO_URL` (each backend +skips if its server is unset/unreachable). diff --git a/docs/storage_backends.md b/docs/storage_backends.md new file mode 100644 index 0000000..5f5fd2c --- /dev/null +++ b/docs/storage_backends.md @@ -0,0 +1,123 @@ +# Pluggable storage backends (`pg` · `redis` · `mongodb`) + +`queue_workflows` is, by design, a **Postgres-as-queue** engine — its claim loop, +lease/reclaim, dispatch outbox and wake are written directly in SQL. That stays +the reference path. This document describes the **`StorageBackend` SPI**, an +additive seam (since v0.3.0) that makes the *queue storage* selectable so the +same durable-queue semantics can run on Redis or MongoDB. + +```python +import queue_workflows +queue_workflows.configure(db_backend="redis") # or "mongodb", or "pg" (default) +from queue_workflows.backends import get_backend +be = get_backend() # bound to config.db_namespace +jid = be.enqueue("cpu", {"task": "render"}) +job = be.claim("cpu", worker="box-1", lease_s=30) +be.complete_with_event(job["id"], "completed", result={"ok": True}) +``` + +Selecting `pg` (the default) imports **nothing** new and changes nothing — the +seam is opt-in and the legacy engine is untouched. + +## What the SPI is — and is not + +The port (`queue_workflows/backends/base.py`, `StorageBackend`) is a **generic +durable work-queue with a transactional outbox**: + +- `enqueue` / `claim` (exactly-once) / `renew_lease` / `reclaim_expired` / `requeue_for_retry` +- idempotent terminals (`mark_completed` / `mark_failed`) and the **atomic outbox** + (`complete_with_event` / `fail_with_event` — go terminal *and* append the event, + both-or-neither) +- best-effort wake (`notify` / `subscribe`), `heartbeat` / `workers`, and the + operator `set_control` / `desired_state` (ON/OFF, default-ON) + +It is deliberately **not leaky**: no method takes or returns a driver handle +(psycopg cursor, redis pipeline, pymongo session). Each backend is **bound to one +namespace** and scopes every key/row/collection by it, so two tenants on one +server are isolated. + +> **Integration boundary (read this).** The SPI is **additive**. Selecting +> `redis`/`mongodb` does **not** re-home the existing orchestrator / claim-worker +> / dispatcher — those still run on Postgres. v0.3.0 ships the SPI + three +> backends that pass an identical contract suite; wiring the DAG orchestrator +> end-to-end onto a non-PG backend is a later milestone. Use the SPI today as a +> standalone pluggable durable queue. + +## How each backend reproduces the contract + +| Guarantee | `pg` | `redis` | `mongodb` | +|---|---|---|---| +| Claim exactly-once | `FOR UPDATE SKIP LOCKED` | `ZPOPMIN` inside a **Lua** script | `find_one_and_update` | +| Priority + FIFO | `ORDER BY priority DESC, created_at` | sorted set, score `-priority`, FIFO tiebreak | sort `priority desc, seq asc` | +| Lease / reclaim | lease column + `UPDATE … lease < now()` | per-queue running ZSET scored by expiry | `lease_expires_at < now` sweep | +| Idempotent terminal | `UPDATE … WHERE status NOT IN (terminal) RETURNING *` | Lua status guard | `find_one_and_update` status guard | +| **Atomic outbox** | one **transaction** | one **Lua script** | one **multi-doc transaction** | +| Wake | `LISTEN` / `pg_notify` (in-txn) | **pub/sub** (fire-and-forget) | **change stream** on a capped coll. | +| Namespace isolation | `namespace` column | key prefix `qw::` | one **database** per namespace | + +### Caveats (be honest about the weakenings) + +- **Redis** has no cross-key ACID transaction; atomicity comes from **Lua** + (single server-side step), so this targets a **single Redis instance**, not + Cluster (Cluster needs all keys in one hash slot). The wake is pub/sub — + fire-and-forget — so a subscriber that is down misses it; the worker's safety + poll covers that, exactly as it does behind PG `LISTEN`. +- **MongoDB** transactions *and* change streams require a **replica set** (a + single-node RS is fine); on a standalone `mongod`, `complete_with_event` / + `fail_with_event` and the wake will fail loudly. `ensure_schema()` pings on + connect. +- **pg** computes `counts()` from the live `status`, so it can never drift; + redis keeps maintained counters (decremented against the *prior* status — see + the audit below); mongo counts live documents. + +## Configuration + +| `configure(...)` key | env (default) | meaning | +|---|---|---| +| `db_backend` | — | `"pg"` (default) / `"redis"` / `"mongodb"` (aliases `postgres`, `mongo`) | +| `db_namespace` | — | tenant scope on a shared server (`""` ⇒ `"default"`) | +| — | `QUEUE_WORKFLOWS_REDIS_URL` | redis DSN (`redis_url_env` renames it) | +| — | `QUEUE_WORKFLOWS_MONGO_URL` | mongo DSN, incl. `?replicaSet=…` / `directConnection=true` | + +Install the optional driver: `pip install 'queue_workflows[redis]'` or +`'queue_workflows[mongodb]'`. Selecting a backend whose driver is missing raises +a clear `ImportError` naming the extra. + +## Tests + +`tests/test_backend_contract.py` is one parametrized suite run against **all three +live servers** (a backend whose server is unreachable is skipped, not failed): + +```bash +docker run -d --name qw_pg -e POSTGRES_PASSWORD=postgres -p 5433:5432 postgres:16 +docker run -d --name qw_redis -p 6380:6379 redis:7 +docker run -d --name qw_mongo -p 27018:27017 mongo:7 --replSet rs0 --bind_ip_all +docker exec qw_mongo mongosh --quiet --eval 'rs.initiate()' + +export QUEUE_WORKFLOWS_TEST_DB_URL="postgresql://postgres:postgres@localhost:5433/queue_workflows_test" +export QUEUE_WORKFLOWS_TEST_REDIS_URL="redis://localhost:6380/0" +export QUEUE_WORKFLOWS_TEST_MONGO_URL="mongodb://localhost:27018/?directConnection=true" +python -m pytest tests/test_backend_contract.py +``` + +The suite pins claim-exactly-once (incl. an 8-thread contention stress test), +lease/renew/reclaim, idempotent terminals, the atomic outbox (both-or-neither + +no duplicate on re-delivery), the wake, heartbeat/control, and cross-namespace +isolation. + +## Audit (v0.3.0) + +The SPI was audited across design / regressions / safety / internal data leakage: + +- **A1 (safety, fixed).** Redis `counts()` used maintained counters that + decremented `:running` unconditionally; a terminal/requeue applied to a job + that wasn't `running` would drift them. Fixed: the Lua now decrements the + job's **prior** status. (pg/mongo derive counts from live status — immune.) +- **A2 (safety, added).** Added a multi-threaded contention test that proves + claim-exactly-once on all three backends, not just sequentially. +- **A3 (design/leakage).** The SPI is additive (see *Integration boundary*); it + does not silently re-home the engine. No driver objects appear in the port, so + PG internals can't leak into redis/mongo call sites. Cross-namespace + claim/read/count/wake are isolated and tested. +- **Regressions:** none — the full engine suite (415 tests) stays green; the new + drivers are imported lazily so a `pg`-only deploy needs neither installed. diff --git a/pyproject.toml b/pyproject.toml index 810c899..8bc6af8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,9 @@ dependencies = [ [project.optional-dependencies] metrics = ["psutil"] # hw_metrics CPU/RAM probe (GPU probe shells out, no dep) -test = ["pytest", "pytest-cov"] +redis = ["redis>=5"] # the 'redis' StorageBackend (queue_workflows.backends.redis) +mongodb = ["pymongo>=4.4"] # the 'mongodb' StorageBackend (needs a replica set) +test = ["pytest", "pytest-cov", "redis>=5", "pymongo>=4.4"] [project.scripts] queue-claim-worker = "queue_workflows.claim_worker:main" # generic CLIs (used standalone / other projects) diff --git a/queue_workflows/__init__.py b/queue_workflows/__init__.py index 5dda315..c344162 100644 --- a/queue_workflows/__init__.py +++ b/queue_workflows/__init__.py @@ -67,6 +67,8 @@ def configure( container_prefix: str | None = None, ingest_queues: frozenset[str] | None = None, ingest_default_budget_s: int | None = None, + db_backend: str | None = None, + db_namespace: str | None = None, ) -> EngineConfig: """Set engine configuration values. Only the passed keyword args are mutated; the rest keep their (ai_leads-byte-compatible) defaults. Returns @@ -101,6 +103,14 @@ def configure( cfg.ingest_queues = iq if ingest_default_budget_s is not None: cfg.ingest_default_budget_s = int(ingest_default_budget_s) + if db_backend is not None: + # Validate against the backend registry (source of truth), imported + # lazily so the package root stays import-light and config a leaf. + from queue_workflows.backends import canonical_backend_name + + cfg.db_backend = canonical_backend_name(db_backend) + if db_namespace is not None: + cfg.db_namespace = str(db_namespace) return cfg diff --git a/queue_workflows/backends/__init__.py b/queue_workflows/backends/__init__.py new file mode 100644 index 0000000..c1804d9 --- /dev/null +++ b/queue_workflows/backends/__init__.py @@ -0,0 +1,149 @@ +"""Storage-backend registry — resolve ``config.db_backend`` to a concrete +:class:`~queue_workflows.backends.base.StorageBackend`. + +Three providers, one file each (split per the request): ``postgres`` (the +reference/default), ``redis``, ``mongodb``. Provider modules are imported +**lazily** — selecting ``pg`` never imports ``redis``/``pymongo``, so those stay +*optional* dependencies (``pip install 'queue_workflows[redis]'`` / +``[mongodb]``) and a pg-only deploy needs neither installed. + +Aliases collapse to a canonical name (``postgres``/``postgresql`` → ``pg``; +``mongo`` → ``mongodb``) so ``configure(db_backend=...)`` is forgiving but the +rest of the code only ever sees the canonical form. +""" + +from __future__ import annotations + +import os +import threading + +from queue_workflows.backends.base import ( + Event, + Job, + StorageBackend, + WakeListener, +) +from queue_workflows.config import get_config + +__all__ = [ + "StorageBackend", + "Job", + "Event", + "WakeListener", + "canonical_backend_name", + "is_known_backend", + "known_backends", + "build_backend", + "get_backend", + "close_all", +] + +#: alias → canonical name. The canonical set is the source of truth ``configure`` +#: validates against. +_ALIASES = { + "pg": "pg", + "postgres": "pg", + "postgresql": "pg", + "redis": "redis", + "mongo": "mongodb", + "mongodb": "mongodb", +} + +# Dotted path to each provider class, imported only when first selected. +_PROVIDERS = { + "pg": ("queue_workflows.backends.postgres", "PostgresBackend"), + "redis": ("queue_workflows.backends.redis", "RedisBackend"), + "mongodb": ("queue_workflows.backends.mongodb", "MongoBackend"), +} + +_instances: dict[tuple[str, str, str], StorageBackend] = {} +_lock = threading.RLock() + + +def known_backends() -> frozenset[str]: + """The canonical backend names (``{"pg", "redis", "mongodb"}``).""" + return frozenset(_PROVIDERS) + + +def canonical_backend_name(name: str) -> str: + """Normalize ``name`` to its canonical form, or raise ``ValueError`` listing + the valid names. Used by ``configure(db_backend=...)`` to fail fast.""" + key = (name or "").strip().lower() + if key not in _ALIASES: + raise ValueError( + f"unknown db_backend {name!r}; valid: " + f"{sorted(set(_ALIASES))} (canonical {sorted(known_backends())})" + ) + return _ALIASES[key] + + +def is_known_backend(name: str) -> bool: + return (name or "").strip().lower() in _ALIASES + + +def _load_provider(canonical: str) -> type[StorageBackend]: + module_path, cls_name = _PROVIDERS[canonical] + import importlib + + try: + module = importlib.import_module(module_path) + except ImportError as exc: # missing optional driver + extra = {"redis": "redis", "mongodb": "mongodb"}.get(canonical, canonical) + raise ImportError( + f"the {canonical!r} backend needs its driver: " + f"pip install 'queue_workflows[{extra}]' ({exc})" + ) from exc + return getattr(module, cls_name) + + +def build_backend(name: str, *, url: str, namespace: str = "") -> StorageBackend: + """Construct a backend explicitly (no config / no caching) — the seam tests + use this to point each provider at its dockerized server + a namespace.""" + cls = _load_provider(canonical_backend_name(name)) + return cls(url=url, namespace=namespace) + + +def _url_for(canonical: str) -> str: + cfg = get_config() + if canonical == "pg": + from queue_workflows.db import db_url + + return db_url() + env_name = cfg.redis_url_env if canonical == "redis" else cfg.mongo_url_env + url = os.environ.get(env_name) + if not url: + raise RuntimeError( + f"db_backend={canonical!r} but {env_name} is not set; " + f"export the {canonical} DSN there (or pass a different env via " + f"configure({'redis_url_env' if canonical == 'redis' else 'mongo_url_env'}=...))." + ) + return url + + +def get_backend(*, namespace: str | None = None) -> StorageBackend: + """Return the process-wide backend for the configured ``db_backend`` + + namespace, building (and caching) it on first use. Cached per + ``(backend, namespace, url)`` so repeated calls reuse one client/pool.""" + cfg = get_config() + canonical = canonical_backend_name(cfg.db_backend) + ns = namespace if namespace is not None else cfg.db_namespace + url = _url_for(canonical) + key = (canonical, ns or "", url) + with _lock: + be = _instances.get(key) + if be is None: + be = _load_provider(canonical)(url=url, namespace=ns or "") + be.ensure_schema() + _instances[key] = be + return be + + +def close_all() -> None: + """Close + drop every cached backend (orchestrator shutdown / test teardown).""" + with _lock: + for be in _instances.values(): + try: + be.close() + except Exception: # teardown best-effort + pass + _instances.clear() diff --git a/queue_workflows/backends/base.py b/queue_workflows/backends/base.py new file mode 100644 index 0000000..e42a98a --- /dev/null +++ b/queue_workflows/backends/base.py @@ -0,0 +1,271 @@ +"""The :class:`StorageBackend` port — a host-agnostic durable work-queue SPI. + +WHY THIS EXISTS. The engine proper is, by design, *Postgres-as-queue*: its +claim loop, lease/reclaim, dispatch outbox and wake are written directly in SQL +(``FOR UPDATE SKIP LOCKED``, ``RETURNING`` CAS, ``pg_notify`` in-trigger). That +is deliberate and stays the reference path. This module factors out the **subset +of behaviour that defines "a queue backend"** into an abstract port so the +*database type* becomes selectable (``configure(db_backend="pg"|"redis"| +"mongodb")``) without each call site speaking SQL. It is **additive**: selecting +``pg`` (the default) changes nothing for existing consumers. + +DESIGN — what the port is, and what it is NOT. + + * It is a **generic durable queue with a transactional outbox**: enqueue → + claim-exactly-once → lease/renew → terminal(+atomic event) → reclaim, plus a + best-effort wake (NOTIFY/pub-sub/change-stream) and the operator heartbeat / + ON-OFF control rows. These are the contracts every backend must honour + identically — pinned by ``tests/test_backend_contract.py``. + * It is **NOT leaky**: no method takes or returns a driver handle (psycopg + cursor, redis pipeline, pymongo session). The outbox atomicity — "go + terminal AND append the event, both-or-neither" — is exposed as a single + high-level call (:meth:`complete_with_event` / :meth:`fail_with_event`) that + each backend implements atomically *in its own idiom* (PG txn / Redis Lua / + Mongo multi-doc txn). Hosts never hold a transaction object, so PG internals + can't bleed into Redis/Mongo call sites (audit dimension: internal data + leakage). + * Each backend instance is **bound to one namespace** (constructor arg). Every + key / row / collection it touches is scoped by that namespace, so two apps + sharing one Redis/Mongo server cannot claim or read each other's jobs. This + is the multi-tenant isolation guard — see the cross-namespace test. + +A backend is constructed with ``(url, namespace)`` and is safe to share across +threads (each call borrows its own connection / pipeline). ``close()`` releases +pooled resources. +""" + +from __future__ import annotations + +import abc +from typing import Any, Iterator, Protocol, TypedDict + +# ── job status vocabulary (identical across backends) ──────────────────────── + +STATUS_QUEUED = "queued" +STATUS_RUNNING = "running" +STATUS_COMPLETED = "completed" +STATUS_FAILED = "failed" +#: Terminal states a row can never leave — the idempotency guard. A second +#: ``mark_*`` / ``*_with_event`` on a row already in one of these is a no-op that +#: returns ``None`` (and writes NO event), exactly like the PG engine's +#: ``UPDATE … WHERE status NOT IN (...) RETURNING *`` shape. +TERMINAL_STATUSES = frozenset({STATUS_COMPLETED, STATUS_FAILED}) + +#: ``""`` namespace ⇒ this literal, so a key/collection always has a real scope. +DEFAULT_NAMESPACE = "default" + + +class Job(TypedDict, total=False): + """The canonical job shape every backend returns (a plain dict at runtime). + + ``lease_expires_at`` / ``created_at`` / ``updated_at`` are epoch seconds + (float) so the shape is backend-neutral — each adapter converts to/from its + native time type. ``payload`` / ``result`` are JSON-able dicts.""" + + id: str + queue: str + namespace: str + status: str + payload: dict[str, Any] + priority: int + attempts: int + claimed_by: str | None + lease_expires_at: float | None + result: dict[str, Any] | None + error: str | None + created_at: float + updated_at: float + + +class Event(TypedDict, total=False): + """An outbox event row (the durable dispatch-event analog).""" + + seq: int + job_id: str + namespace: str + queue: str + event_type: str + detail: dict[str, Any] + created_at: float + + +class WakeListener(Protocol): + """A subscription handle returned by :meth:`StorageBackend.subscribe`. + + Used as a context manager. :meth:`wait` blocks up to ``timeout`` for the + next wake payload (a queue name) and returns it, or ``None`` on timeout — so + a worker loops ``while not stop: q = sub.wait(1.0)`` exactly as it loops on + ``LISTEN`` today (the timeout doubling as the dropped-NOTIFY safety poll).""" + + def __enter__(self) -> WakeListener: ... + def __exit__(self, *exc: object) -> None: ... + def wait(self, timeout: float) -> str | None: ... + + +class StorageBackend(abc.ABC): + """Abstract durable-queue backend. See module docstring for the contract. + + Subclasses: :class:`~queue_workflows.backends.postgres.PostgresBackend`, + :class:`~queue_workflows.backends.redis.RedisBackend`, + :class:`~queue_workflows.backends.mongodb.MongoBackend`. + """ + + #: Registry name (``"pg"`` / ``"redis"`` / ``"mongodb"``); set on subclasses. + name: str = "" + + def __init__(self, *, url: str, namespace: str = "") -> None: + self.url = url + self.namespace = namespace or DEFAULT_NAMESPACE + + # ── schema / lifecycle ──────────────────────────────────────────────────── + + @abc.abstractmethod + def ensure_schema(self) -> None: + """Idempotently create whatever durable structures the backend needs + (PG tables / Mongo indexes; a no-op for Redis). Safe to call on boot.""" + + @abc.abstractmethod + def close(self) -> None: + """Release pooled connections / clients.""" + + # ── enqueue / claim / lease ──────────────────────────────────────────────── + + @abc.abstractmethod + def enqueue( + self, queue: str, payload: dict[str, Any], *, + job_id: str | None = None, priority: int = 0, + ) -> str: + """Append a ``queued`` job and fire the wake NOTIFY for ``queue``. + Returns the job id (generated when ``job_id`` is None). The wake must be + emitted as part of the same durable write (no "queued but no wake").""" + + @abc.abstractmethod + def claim(self, queue: str, worker: str, *, lease_s: float) -> Job | None: + """Atomically take the single oldest claimable (``queued``, highest + priority) job on ``queue``: flip it ``running``, stamp ``claimed_by`` + + ``lease_expires_at = now+lease_s``, bump ``attempts``. Returns the job, + or ``None`` if none claimable. **Exactly-once under contention**: two + concurrent callers never receive the same job (the SKIP-LOCKED guarantee + each backend reproduces in its own idiom).""" + + @abc.abstractmethod + def renew_lease(self, job_id: str, worker: str, *, lease_s: float) -> bool: + """Extend ``lease_expires_at`` to ``now+lease_s`` iff the job is still + ``running`` and still owned by ``worker``. Returns whether it renewed.""" + + @abc.abstractmethod + def reclaim_expired(self, *, queue: str | None = None) -> list[str]: + """Re-queue every ``running`` job whose lease has lapsed (optionally + filtered to ``queue``): flip ``running`` → ``queued``, clear the lease / + owner, re-fire the wake. Returns the reclaimed job ids. This is the sole + recovery path for an orphaned ``running`` row.""" + + @abc.abstractmethod + def requeue_for_retry(self, job_id: str) -> Job | None: + """Watchdog re-queue: flip a ``running`` job back to ``queued`` (clear + lease/owner, keep ``attempts`` as the retry counter), re-fire the wake, + and write **no** event. Returns the updated job, or ``None`` if it was + already terminal.""" + + # ── terminal transitions (idempotent) ────────────────────────────────────── + + @abc.abstractmethod + def mark_completed( + self, job_id: str, *, result: dict[str, Any] | None = None, + ) -> Job | None: + """Flip ``job_id`` → ``completed`` iff not already terminal; stamp + ``result``. Returns the row, or ``None`` if it was already terminal + (the idempotency guard — a duplicate delivery is a safe no-op).""" + + @abc.abstractmethod + def mark_failed(self, job_id: str, *, error: str | None = None) -> Job | None: + """Flip ``job_id`` → ``failed`` iff not already terminal; stamp + ``error``. Returns the row, or ``None`` if already terminal.""" + + @abc.abstractmethod + def complete_with_event( + self, job_id: str, event_type: str, *, + result: dict[str, Any] | None = None, + detail: dict[str, Any] | None = None, + ) -> Job | None: + """ATOMIC outbox: go ``completed`` **and** append one :class:`Event`, + both-or-neither. Returns the row, or ``None`` if already terminal — in + which case **no event is written** (the second-delivery no-op).""" + + @abc.abstractmethod + def fail_with_event( + self, job_id: str, event_type: str, *, + error: str | None = None, + detail: dict[str, Any] | None = None, + ) -> Job | None: + """ATOMIC outbox twin of :meth:`complete_with_event` for the failure + path (mark ``failed`` + append the event in one unit, or neither).""" + + # ── reads ────────────────────────────────────────────────────────────────── + + @abc.abstractmethod + def get(self, job_id: str) -> Job | None: + """Return the job by id (within this namespace), or ``None``.""" + + @abc.abstractmethod + def counts(self, queue: str) -> dict[str, int]: + """``{queued, running, completed, failed}`` for ``queue`` (snapshot).""" + + @abc.abstractmethod + def events(self, *, since: int = 0, limit: int = 1000) -> list[Event]: + """Outbox events with ``seq > since`` (oldest-first), for an event + drainer / the atomicity assertions. Namespace-scoped.""" + + # ── wake (NOTIFY / pub-sub / change-stream) ───────────────────────────────── + + @abc.abstractmethod + def notify(self, queue: str) -> None: + """Best-effort wake for listeners on ``queue`` (out-of-band; ``enqueue`` + already wakes). Mirrors a manual ``pg_notify``.""" + + @abc.abstractmethod + def subscribe(self, *queues: str) -> WakeListener: + """Open a :class:`WakeListener` for the given queues. Best-effort: a + dropped wake is covered by the caller's safety-poll timeout, identical to + the engine's 1 s ``LISTEN`` safety poll.""" + + # ── heartbeats + operator ON/OFF control ──────────────────────────────────── + + @abc.abstractmethod + def heartbeat( + self, host: str, queue: str, *, + current_model: str | None = None, stale_after_s: float = 30.0, + ) -> None: + """Upsert this worker's ``(host, queue)`` liveness row (TTL/last_seen).""" + + @abc.abstractmethod + def workers(self, queue: str) -> list[dict[str, Any]]: + """Live workers on ``queue`` (heartbeat fresher than its TTL).""" + + @abc.abstractmethod + def set_control( + self, host: str, queue: str, *, + desired_state: str, stop_policy: str = "hard", + requested_by: str | None = None, + ) -> None: + """Upsert the operator desired-state row for a ``(host, queue)`` worker.""" + + @abc.abstractmethod + def desired_state(self, host: str, queue: str) -> str: + """Effective desired state: ``"off"`` only when an explicit OFF row + exists, else ``"on"`` (absent ⇒ ON — the default-on contract).""" + + +def normalized_namespace(namespace: str) -> str: + """``""`` → :data:`DEFAULT_NAMESPACE`, else verbatim (trimmed).""" + return (namespace or "").strip() or DEFAULT_NAMESPACE + + +def drain_iter(listener: WakeListener, *, timeout: float) -> Iterator[str]: + """Yield wake payloads until a ``timeout`` elapses with none — a small helper + for tests/consumers. (Backends needn't override.)""" + while True: + q = listener.wait(timeout) + if q is None: + return + yield q diff --git a/queue_workflows/backends/mongodb.py b/queue_workflows/backends/mongodb.py new file mode 100644 index 0000000..5f0ce62 --- /dev/null +++ b/queue_workflows/backends/mongodb.py @@ -0,0 +1,324 @@ +"""MongoDB :class:`StorageBackend`. + +Mongo maps onto the contract more directly than Redis but with its own caveats, +which the audit flagged up front: + + * **Atomic claim via ``find_one_and_update``** — find the oldest, highest + priority ``queued`` job and flip it ``running`` in one atomic document op. + Two concurrent claimers get different jobs because the first's update removes + it from the ``status:'queued'`` filter before the second matches it + (the standard Mongo work-queue claim; reproduces SKIP-LOCKED's effect). + * **Atomic outbox via a MULTI-DOCUMENT TRANSACTION** — go terminal *and* insert + the event in one transaction, both-or-neither. This is why the backend needs + a **replica set** (transactions — and change streams — are unavailable on a + standalone ``mongod``); a single-node RS is enough. + * **Wake via a change stream** on a capped ``wake`` collection (also RS-only). + Each enqueue/reclaim/requeue inserts a tiny ``{queue}`` doc; the stream + surfaces it. Best-effort like every wake — the worker's safety poll covers a + stream that hasn't resumed yet. + +Each namespace is a SEPARATE DATABASE (``qw_``), so two tenants on one +server share nothing — the strongest possible isolation (the data-leakage guard). +""" + +from __future__ import annotations + +import re +import time +import uuid +from datetime import datetime, timezone +from typing import Any + +from pymongo import ASCENDING, DESCENDING, MongoClient, ReturnDocument +from pymongo.errors import CollectionInvalid + +from queue_workflows.backends.base import Event, Job, StorageBackend, WakeListener + +_TERMINAL = ("completed", "failed") + + +def _db_name(namespace: str) -> str: + return "qw_" + re.sub(r"[^A-Za-z0-9_]", "_", namespace)[:48] + + +class MongoBackend(StorageBackend): + name = "mongodb" + + def __init__(self, *, url: str, namespace: str = "") -> None: + super().__init__(url=url, namespace=namespace) + self._client: MongoClient = MongoClient( + url, serverSelectionTimeoutMS=5000, tz_aware=True, + ) + self._db = self._client[_db_name(self.namespace)] + self._jobs = self._db["jobs"] + self._events = self._db["events"] + self._workers = self._db["workers"] + self._controls = self._db["controls"] + self._counters = self._db["counters"] + self._wake = self._db["wake"] + + # ── schema ────────────────────────────────────────────────────────────────── + + def ensure_schema(self) -> None: + # Force server selection now so an unreachable server fails the probe. + self._client.admin.command("ping") + try: + self._db.create_collection("wake", capped=True, size=1 << 20) + except CollectionInvalid: + pass # already exists + self._jobs.create_index( + [("queue", ASCENDING), ("status", ASCENDING), + ("priority", DESCENDING), ("seq", ASCENDING)] + ) + self._jobs.create_index([("status", ASCENDING), ("lease_expires_at", ASCENDING)]) + self._events.create_index([("seq", ASCENDING)]) + self._controls.create_index( + [("host", ASCENDING), ("queue", ASCENDING)], unique=True + ) + # TTL: stale heartbeats self-expire (the liveness window). + self._workers.create_index("last_seen", expireAfterSeconds=60) + self._workers.create_index([("queue", ASCENDING)]) + for cid in ("seq", "eventseq"): + self._counters.update_one({"_id": cid}, {"$setOnInsert": {"n": 0}}, upsert=True) + + def close(self) -> None: + self._client.close() + + # ── counters ────────────────────────────────────────────────────────────────── + + def _next(self, counter_id: str, *, session=None) -> int: + doc = self._counters.find_one_and_update( + {"_id": counter_id}, {"$inc": {"n": 1}}, + upsert=True, return_document=ReturnDocument.AFTER, session=session, + ) + return int(doc["n"]) + + def _wake_insert(self, queue: str) -> None: + try: + self._wake.insert_one({"queue": queue, "ts": time.time()}) + except Exception: + pass # wake is best-effort + + # ── enqueue / claim / lease ──────────────────────────────────────────────── + + def enqueue(self, queue, payload, *, job_id=None, priority=0) -> str: + jid = job_id or uuid.uuid4().hex + now = time.time() + self._jobs.insert_one({ + "_id": jid, "namespace": self.namespace, "queue": queue, + "status": "queued", "payload": payload or {}, "priority": int(priority), + "attempts": 0, "claimed_by": None, "lease_expires_at": None, + "result": None, "error": None, "seq": self._next("seq"), + "created_at": now, "updated_at": now, + }) + self._wake_insert(queue) + return jid + + def claim(self, queue, worker, *, lease_s) -> Job | None: + now = time.time() + doc = self._jobs.find_one_and_update( + {"queue": queue, "status": "queued"}, + {"$set": {"status": "running", "claimed_by": worker, + "lease_expires_at": now + float(lease_s), "updated_at": now}, + "$inc": {"attempts": 1}}, + sort=[("priority", DESCENDING), ("seq", ASCENDING)], + return_document=ReturnDocument.AFTER, + ) + return _job(doc) + + def renew_lease(self, job_id, worker, *, lease_s) -> bool: + now = time.time() + doc = self._jobs.find_one_and_update( + {"_id": job_id, "status": "running", "claimed_by": worker}, + {"$set": {"lease_expires_at": now + float(lease_s), "updated_at": now}}, + return_document=ReturnDocument.AFTER, + ) + return doc is not None + + def reclaim_expired(self, *, queue=None) -> list[str]: + now = time.time() + flt: dict[str, Any] = {"status": "running", "lease_expires_at": {"$lt": now}} + if queue is not None: + flt["queue"] = queue + out: list[str] = [] + while True: + doc = self._jobs.find_one_and_update( + flt, + {"$set": {"status": "queued", "claimed_by": None, + "lease_expires_at": None, "updated_at": time.time()}}, + return_document=ReturnDocument.AFTER, + ) + if doc is None: + break + out.append(doc["_id"]) + self._wake_insert(doc["queue"]) + return out + + def requeue_for_retry(self, job_id) -> Job | None: + doc = self._jobs.find_one_and_update( + {"_id": job_id, "status": {"$nin": list(_TERMINAL)}}, + {"$set": {"status": "queued", "claimed_by": None, + "lease_expires_at": None, "updated_at": time.time()}}, + return_document=ReturnDocument.AFTER, + ) + if doc is not None: + self._wake_insert(doc["queue"]) + return _job(doc) + + # ── terminal transitions ──────────────────────────────────────────────────── + + def _mark(self, job_id, status, *, result, error) -> Job | None: + doc = self._jobs.find_one_and_update( + {"_id": job_id, "status": {"$nin": list(_TERMINAL)}}, + {"$set": {"status": status, "result": result, "error": error, + "updated_at": time.time()}}, + return_document=ReturnDocument.AFTER, + ) + return _job(doc) + + def mark_completed(self, job_id, *, result=None) -> Job | None: + return self._mark(job_id, "completed", result=result, error=None) + + def mark_failed(self, job_id, *, error=None) -> Job | None: + return self._mark(job_id, "failed", result=None, error=error) + + def _terminal_with_event(self, job_id, status, event_type, *, result, error, detail): + # One transaction: the terminal flip + the event insert commit together, + # or — when the job is already terminal (0 docs matched) — neither does. + with self._client.start_session() as session: + with session.start_transaction(): + doc = self._jobs.find_one_and_update( + {"_id": job_id, "status": {"$nin": list(_TERMINAL)}}, + {"$set": {"status": status, "result": result, "error": error, + "updated_at": time.time()}}, + return_document=ReturnDocument.AFTER, session=session, + ) + if doc is None: + return None # already terminal → empty txn, NO event written + self._events.insert_one({ + "seq": self._next("eventseq", session=session), + "namespace": self.namespace, "job_id": job_id, + "queue": doc["queue"], "event_type": event_type, + "detail": detail or {}, "created_at": time.time(), + }, session=session) + return _job(doc) + + def complete_with_event(self, job_id, event_type, *, result=None, detail=None): + return self._terminal_with_event( + job_id, "completed", event_type, result=result, error=None, detail=detail + ) + + def fail_with_event(self, job_id, event_type, *, error=None, detail=None): + return self._terminal_with_event( + job_id, "failed", event_type, result=None, error=error, detail=detail + ) + + # ── reads ──────────────────────────────────────────────────────────────────── + + def get(self, job_id) -> Job | None: + return _job(self._jobs.find_one({"_id": job_id})) + + def counts(self, queue) -> dict[str, int]: + return { + s: self._jobs.count_documents({"queue": queue, "status": s}) + for s in ("queued", "running", "completed", "failed") + } + + def events(self, *, since=0, limit=1000) -> list[Event]: + cur = self._events.find({"seq": {"$gt": since}}).sort("seq", ASCENDING).limit(limit) + return [ + Event( + seq=int(d["seq"]), job_id=d["job_id"], namespace=self.namespace, + queue=d.get("queue"), event_type=d["event_type"], + detail=d.get("detail") or {}, created_at=float(d.get("created_at") or 0), + ) + for d in cur + ] + + # ── wake (change stream) ──────────────────────────────────────────────────── + + def notify(self, queue) -> None: + self._wake_insert(queue) + + def subscribe(self, *queues) -> WakeListener: + return _MongoWakeListener(self._wake, frozenset(queues)) + + # ── heartbeats + control ─────────────────────────────────────────────────────── + + def heartbeat(self, host, queue, *, current_model=None, stale_after_s=30.0) -> None: + self._workers.update_one( + {"_id": f"{queue}:{host}"}, + {"$set": {"host": host, "queue": queue, "current_model": current_model, + "last_seen": datetime.now(timezone.utc)}}, + upsert=True, + ) + + def workers(self, queue) -> list[dict[str, Any]]: + return [ + {"host": d.get("host"), "queue": d.get("queue"), + "current_model": d.get("current_model"), + "last_seen": d["last_seen"].timestamp() if d.get("last_seen") else None} + for d in self._workers.find({"queue": queue}) + ] + + def set_control(self, host, queue, *, desired_state, stop_policy="hard", + requested_by=None) -> None: + self._controls.update_one( + {"host": host, "queue": queue}, + {"$set": {"desired_state": desired_state, "stop_policy": stop_policy, + "requested_by": requested_by, "updated_at": time.time()}}, + upsert=True, + ) + + def desired_state(self, host, queue) -> str: + doc = self._controls.find_one({"host": host, "queue": queue}) + return "off" if (doc and doc.get("desired_state") == "off") else "on" + + +def _job(doc: dict[str, Any] | None) -> Job | None: + if doc is None: + return None + return Job( + id=doc["_id"], queue=doc["queue"], namespace=doc.get("namespace", ""), + status=doc["status"], payload=doc.get("payload") or {}, + priority=int(doc.get("priority") or 0), attempts=int(doc.get("attempts") or 0), + claimed_by=doc.get("claimed_by"), lease_expires_at=doc.get("lease_expires_at"), + result=doc.get("result"), error=doc.get("error"), + created_at=float(doc.get("created_at") or 0), + updated_at=float(doc.get("updated_at") or 0), + ) + + +class _MongoWakeListener: + """Change stream over the capped ``wake`` collection; ``wait`` returns the + inserted queue if it's one we subscribed to.""" + + def __init__(self, wake_collection, queues: frozenset[str]) -> None: + self._coll = wake_collection + self._queues = queues + self._stream = None + + def __enter__(self): + # Open the stream BEFORE the caller enqueues, so the insert is captured. + self._stream = self._coll.watch( + [{"$match": {"operationType": "insert"}}], max_await_time_ms=250, + ) + return self + + def __exit__(self, *exc: object) -> None: + if self._stream is not None: + try: + self._stream.close() + finally: + self._stream = None + + def wait(self, timeout: float) -> str | None: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + change = self._stream.try_next() + if change is None: + continue # try_next already blocked up to max_await_time_ms + q = (change.get("fullDocument") or {}).get("queue") + if q is not None and (not self._queues or q in self._queues): + return q + return None diff --git a/queue_workflows/backends/postgres.py b/queue_workflows/backends/postgres.py new file mode 100644 index 0000000..5c8fa85 --- /dev/null +++ b/queue_workflows/backends/postgres.py @@ -0,0 +1,438 @@ +"""PostgreSQL :class:`StorageBackend` — the reference adapter. + +This is the canonical implementation: it expresses the contract with the exact +primitives the engine proper uses — ``FOR UPDATE SKIP LOCKED`` for the claim, +``UPDATE … WHERE status NOT IN (terminal) RETURNING *`` for idempotent +transitions, a single transaction for the atomic outbox, and ``pg_notify`` for +the wake. It is the yardstick the redis / mongodb adapters are measured against +by ``tests/test_backend_contract.py``. + +It uses its OWN small tables (``qw_jobs`` / ``qw_events`` / ``qw_workers`` / +``qw_controls``), separate from the engine's ``workflow_*`` schema, so enabling +the SPI never collides with — or migrates — a host's existing engine tables. +Every row carries a ``namespace`` column and every query filters on it, so one +Postgres database can host several isolated tenants (the data-leakage guard). +""" + +from __future__ import annotations + +import logging +import re +import threading +import uuid +from contextlib import contextmanager +from datetime import datetime, timezone +from typing import Any, Iterator + +import psycopg +from psycopg.rows import dict_row +from psycopg.types.json import Json +from psycopg_pool import ConnectionPool + +from queue_workflows.backends.base import ( + STATUS_QUEUED, + STATUS_RUNNING, + TERMINAL_STATUSES, + Event, + Job, + StorageBackend, + WakeListener, +) + +log = logging.getLogger(__name__) + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS qw_jobs ( + id TEXT PRIMARY KEY, + namespace TEXT NOT NULL, + queue TEXT NOT NULL, + status TEXT NOT NULL, + payload JSONB NOT NULL DEFAULT '{}'::jsonb, + priority INTEGER NOT NULL DEFAULT 0, + attempts INTEGER NOT NULL DEFAULT 0, + claimed_by TEXT, + lease_expires_at TIMESTAMPTZ, + result JSONB, + error TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +CREATE INDEX IF NOT EXISTS qw_jobs_claim_idx + ON qw_jobs (namespace, queue, status, priority DESC, created_at); + +CREATE TABLE IF NOT EXISTS qw_events ( + seq BIGSERIAL PRIMARY KEY, + namespace TEXT NOT NULL, + job_id TEXT NOT NULL, + queue TEXT, + event_type TEXT NOT NULL, + detail JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +CREATE INDEX IF NOT EXISTS qw_events_ns_seq_idx ON qw_events (namespace, seq); + +CREATE TABLE IF NOT EXISTS qw_workers ( + namespace TEXT NOT NULL, + host TEXT NOT NULL, + queue TEXT NOT NULL, + current_model TEXT, + last_seen TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (namespace, host, queue) +); + +CREATE TABLE IF NOT EXISTS qw_controls ( + namespace TEXT NOT NULL, + host TEXT NOT NULL, + queue TEXT NOT NULL, + desired_state TEXT NOT NULL DEFAULT 'on', + stop_policy TEXT NOT NULL DEFAULT 'hard', + requested_by TEXT, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (namespace, host, queue) +); +""" + + +def _epoch(dt: datetime | None) -> float | None: + return dt.timestamp() if isinstance(dt, datetime) else None + + +def _channel(namespace: str) -> str: + """A NOTIFY channel name scoped to the namespace (so a wake never crosses + tenants). Sanitized to a valid unquoted identifier; the queue rides the + payload so one channel per namespace suffices.""" + safe = re.sub(r"[^a-zA-Z0-9_]", "_", namespace)[:48] + return f"qwbe_{safe}" + + +class PostgresBackend(StorageBackend): + name = "pg" + + def __init__(self, *, url: str, namespace: str = "") -> None: + super().__init__(url=url, namespace=namespace) + self._pool: ConnectionPool | None = None + self._lock = threading.Lock() + + # ── pool / schema ────────────────────────────────────────────────────────── + + def _ensure_pool(self) -> ConnectionPool: + if self._pool is None: + with self._lock: + if self._pool is None: + pool = ConnectionPool( + self.url, min_size=1, max_size=5, open=False, + kwargs={"row_factory": dict_row}, + ) + pool.open(wait=True, timeout=15.0) + self._pool = pool + return self._pool + + @contextmanager + def _conn(self) -> Iterator[psycopg.Connection]: + with self._ensure_pool().connection() as conn: + yield conn + + def ensure_schema(self) -> None: + with self._conn() as conn: + conn.execute(_SCHEMA) + conn.commit() + + def close(self) -> None: + with self._lock: + if self._pool is not None: + self._pool.close() + self._pool = None + + # ── enqueue / claim / lease ──────────────────────────────────────────────── + + def enqueue( + self, queue: str, payload: dict[str, Any], *, + job_id: str | None = None, priority: int = 0, + ) -> str: + jid = job_id or uuid.uuid4().hex + with self._conn() as conn: + conn.execute( + "INSERT INTO qw_jobs (id, namespace, queue, status, payload, priority) " + "VALUES (%s, %s, %s, %s, %s, %s)", + (jid, self.namespace, queue, STATUS_QUEUED, Json(payload or {}), priority), + ) + # The wake rides the same commit as the insert (no "queued but no wake"). + conn.execute("SELECT pg_notify(%s, %s)", (_channel(self.namespace), queue)) + conn.commit() + return jid + + _CLAIM = """ + UPDATE qw_jobs SET + status = 'running', claimed_by = %(worker)s, + lease_expires_at = now() + make_interval(secs => %(lease)s), + attempts = attempts + 1, updated_at = now() + WHERE id = ( + SELECT id FROM qw_jobs + WHERE namespace = %(ns)s AND queue = %(queue)s AND status = 'queued' + ORDER BY priority DESC, created_at, id + FOR UPDATE SKIP LOCKED + LIMIT 1 + ) + RETURNING * + """ + + def claim(self, queue: str, worker: str, *, lease_s: float) -> Job | None: + with self._conn() as conn: + row = conn.execute( + self._CLAIM, + {"worker": worker, "lease": float(lease_s), "ns": self.namespace, + "queue": queue}, + ).fetchone() + conn.commit() + return _row_to_job(row) + + def renew_lease(self, job_id: str, worker: str, *, lease_s: float) -> bool: + with self._conn() as conn: + row = conn.execute( + "UPDATE qw_jobs SET lease_expires_at = now() + make_interval(secs => %s), " + "updated_at = now() " + "WHERE id = %s AND namespace = %s AND status = 'running' " + "AND claimed_by = %s RETURNING id", + (float(lease_s), job_id, self.namespace, worker), + ).fetchone() + conn.commit() + return row is not None + + def reclaim_expired(self, *, queue: str | None = None) -> list[str]: + sql = ( + "UPDATE qw_jobs SET status = 'queued', claimed_by = NULL, " + "lease_expires_at = NULL, updated_at = now() " + "WHERE namespace = %s AND status = 'running' AND lease_expires_at < now()" + ) + params: list[Any] = [self.namespace] + if queue is not None: + sql += " AND queue = %s" + params.append(queue) + sql += " RETURNING id, queue" + with self._conn() as conn: + rows = conn.execute(sql, params).fetchall() + for r in rows: # re-fire the wake for each reclaimed queue + conn.execute( + "SELECT pg_notify(%s, %s)", (_channel(self.namespace), r["queue"]) + ) + conn.commit() + return [r["id"] for r in rows] + + def requeue_for_retry(self, job_id: str) -> Job | None: + with self._conn() as conn: + row = conn.execute( + "UPDATE qw_jobs SET status = 'queued', claimed_by = NULL, " + "lease_expires_at = NULL, updated_at = now() " + "WHERE id = %s AND namespace = %s " + "AND status NOT IN ('completed', 'failed') RETURNING *", + (job_id, self.namespace), + ).fetchone() + if row is not None: + conn.execute( + "SELECT pg_notify(%s, %s)", (_channel(self.namespace), row["queue"]) + ) + conn.commit() + return _row_to_job(row) + + # ── terminal transitions ──────────────────────────────────────────────────── + + def _mark(self, job_id: str, status: str, *, result, error) -> Job | None: + with self._conn() as conn: + row = self._mark_in(conn, job_id, status, result=result, error=error) + conn.commit() + return _row_to_job(row) + + def _mark_in(self, conn, job_id, status, *, result, error): + return conn.execute( + "UPDATE qw_jobs SET status = %s, result = %s, error = %s, updated_at = now() " + "WHERE id = %s AND namespace = %s AND status NOT IN ('completed', 'failed') " + "RETURNING *", + (status, Json(result) if result is not None else None, error, + job_id, self.namespace), + ).fetchone() + + def mark_completed(self, job_id, *, result=None) -> Job | None: + return self._mark(job_id, "completed", result=result, error=None) + + def mark_failed(self, job_id, *, error=None) -> Job | None: + return self._mark(job_id, "failed", result=None, error=error) + + def _terminal_with_event( + self, job_id, status, event_type, *, result, error, detail, + ) -> Job | None: + # ONE transaction: the terminal UPDATE and the event INSERT commit + # together, or (when the row is already terminal → 0 rows updated) neither + # is written. This is the outbox-atomicity contract in its purest form. + with self._conn() as conn: + row = self._mark_in(conn, job_id, status, result=result, error=error) + if row is None: + conn.rollback() + return None + conn.execute( + "INSERT INTO qw_events (namespace, job_id, queue, event_type, detail) " + "VALUES (%s, %s, %s, %s, %s)", + (self.namespace, job_id, row["queue"], event_type, Json(detail or {})), + ) + conn.commit() + return _row_to_job(row) + + def complete_with_event(self, job_id, event_type, *, result=None, detail=None): + return self._terminal_with_event( + job_id, "completed", event_type, result=result, error=None, detail=detail + ) + + def fail_with_event(self, job_id, event_type, *, error=None, detail=None): + return self._terminal_with_event( + job_id, "failed", event_type, result=None, error=error, detail=detail + ) + + # ── reads ──────────────────────────────────────────────────────────────────── + + def get(self, job_id: str) -> Job | None: + with self._conn() as conn: + row = conn.execute( + "SELECT * FROM qw_jobs WHERE id = %s AND namespace = %s", + (job_id, self.namespace), + ).fetchone() + return _row_to_job(row) + + def counts(self, queue: str) -> dict[str, int]: + with self._conn() as conn: + rows = conn.execute( + "SELECT status, count(*) AS n FROM qw_jobs " + "WHERE namespace = %s AND queue = %s GROUP BY status", + (self.namespace, queue), + ).fetchall() + out = {"queued": 0, "running": 0, "completed": 0, "failed": 0} + for r in rows: + out[r["status"]] = int(r["n"]) + return out + + def events(self, *, since: int = 0, limit: int = 1000) -> list[Event]: + with self._conn() as conn: + rows = conn.execute( + "SELECT seq, job_id, queue, event_type, detail, created_at " + "FROM qw_events WHERE namespace = %s AND seq > %s " + "ORDER BY seq LIMIT %s", + (self.namespace, since, limit), + ).fetchall() + return [ + Event( + seq=int(r["seq"]), job_id=r["job_id"], namespace=self.namespace, + queue=r["queue"], event_type=r["event_type"], + detail=r["detail"] or {}, created_at=_epoch(r["created_at"]), + ) + for r in rows + ] + + # ── wake ────────────────────────────────────────────────────────────────────── + + def notify(self, queue: str) -> None: + with self._conn() as conn: + conn.execute("SELECT pg_notify(%s, %s)", (_channel(self.namespace), queue)) + conn.commit() + + def subscribe(self, *queues: str) -> WakeListener: + return _PgWakeListener(self.url, _channel(self.namespace), frozenset(queues)) + + # ── heartbeats + control ─────────────────────────────────────────────────────── + + def heartbeat(self, host, queue, *, current_model=None, stale_after_s=30.0) -> None: + with self._conn() as conn: + conn.execute( + "INSERT INTO qw_workers (namespace, host, queue, current_model, last_seen) " + "VALUES (%s, %s, %s, %s, now()) " + "ON CONFLICT (namespace, host, queue) DO UPDATE " + "SET current_model = EXCLUDED.current_model, last_seen = now()", + (self.namespace, host, queue, current_model), + ) + conn.commit() + + def workers(self, queue: str) -> list[dict[str, Any]]: + with self._conn() as conn: + rows = conn.execute( + "SELECT host, queue, current_model, last_seen FROM qw_workers " + "WHERE namespace = %s AND queue = %s " + "AND last_seen > now() - interval '60 seconds'", + (self.namespace, queue), + ).fetchall() + return [ + {"host": r["host"], "queue": r["queue"], + "current_model": r["current_model"], "last_seen": _epoch(r["last_seen"])} + for r in rows + ] + + def set_control(self, host, queue, *, desired_state, stop_policy="hard", + requested_by=None) -> None: + with self._conn() as conn: + conn.execute( + "INSERT INTO qw_controls " + "(namespace, host, queue, desired_state, stop_policy, requested_by, updated_at) " + "VALUES (%s, %s, %s, %s, %s, %s, now()) " + "ON CONFLICT (namespace, host, queue) DO UPDATE " + "SET desired_state = EXCLUDED.desired_state, " + "stop_policy = EXCLUDED.stop_policy, " + "requested_by = EXCLUDED.requested_by, updated_at = now()", + (self.namespace, host, queue, desired_state, stop_policy, requested_by), + ) + conn.commit() + + def desired_state(self, host: str, queue: str) -> str: + with self._conn() as conn: + row = conn.execute( + "SELECT desired_state FROM qw_controls " + "WHERE namespace = %s AND host = %s AND queue = %s", + (self.namespace, host, queue), + ).fetchone() + return "off" if (row and row["desired_state"] == "off") else "on" + + +def _row_to_job(row: dict[str, Any] | None) -> Job | None: + if row is None: + return None + return Job( + id=row["id"], queue=row["queue"], namespace=row["namespace"], + status=row["status"], payload=row["payload"] or {}, + priority=int(row["priority"]), attempts=int(row["attempts"]), + claimed_by=row["claimed_by"], lease_expires_at=_epoch(row["lease_expires_at"]), + result=row["result"], error=row["error"], + created_at=_epoch(row["created_at"]), updated_at=_epoch(row["updated_at"]), + ) + + +class _PgWakeListener: + """LISTEN on the namespace channel; ``wait`` returns the queue payload if it + is one we subscribed to (else keeps waiting until the timeout). Uses its own + autocommit connection, as LISTEN must live outside the pooled txns.""" + + def __init__(self, url: str, channel: str, queues: frozenset[str]) -> None: + self._url = url + self._channel = channel + self._queues = queues + self._conn: psycopg.Connection | None = None + + def __enter__(self) -> "_PgWakeListener": + self._conn = psycopg.connect(self._url, autocommit=True) + # channel is a sanitized identifier; quote defensively all the same. + self._conn.execute(f'LISTEN "{self._channel}"') + return self + + def __exit__(self, *exc: object) -> None: + if self._conn is not None: + self._conn.close() + self._conn = None + + def wait(self, timeout: float) -> str | None: + import time + + assert self._conn is not None + deadline = time.monotonic() + timeout + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + return None + for note in self._conn.notifies(timeout=remaining, stop_after=1): + if not self._queues or note.payload in self._queues: + return note.payload + # not one of ours — keep waiting within the remaining budget + # loop re-checks the deadline diff --git a/queue_workflows/backends/redis.py b/queue_workflows/backends/redis.py new file mode 100644 index 0000000..5a1b143 --- /dev/null +++ b/queue_workflows/backends/redis.py @@ -0,0 +1,374 @@ +"""Redis :class:`StorageBackend`. + +Redis has no SQL, no ``SKIP LOCKED`` and no cross-key ACID transaction, so the +contract is reproduced with the tools Redis *does* give us: + + * **Atomic claim / terminal / re-queue via Lua.** A registered Lua script runs + server-side as one indivisible step, which is how we get the engine's two + keystone guarantees on Redis: claim-exactly-once (no two callers pop the same + job) and the atomic outbox (go terminal *and* append the event, or neither — + a single script, so a crash can't split them). + * **Priority + FIFO via a sorted set per queue** (``…:q:``): score is + ``-priority`` so ``ZPOPMIN`` takes the highest priority first, and equal + scores break ties by member — a zero-padded monotonic sequence — i.e. FIFO. + * **Lease/reclaim via a per-queue running sorted set** scored by lease expiry; + ``reclaim_expired`` is a ``ZRANGEBYSCORE … now`` sweep. + * **Wake via pub/sub** on a per-namespace channel (payload = queue). Pub/sub is + fire-and-forget — a subscriber that is down misses the message — which is why + the worker keeps a safety poll exactly as it does behind PG ``LISTEN``. + +Everything is under the key prefix ``qw::`` so two tenants on one +Redis server are fully isolated (the data-leakage guard). NOTE: keys are derived +server-side inside the scripts, so this targets a **single Redis instance**, not +Cluster (which would require all keys in one hash slot). +""" + +from __future__ import annotations + +import json +import time +from typing import Any + +import redis # the redis-py client (absolute import; not this module) + +from queue_workflows.backends.base import Event, Job, StorageBackend, WakeListener + +# ── Lua (each script is ONE atomic server-side step) ───────────────────────── + +_LUA_ENQUEUE = """ +local p = ARGV[1] +local seq = redis.call('INCR', p..'seq') +local jobkey = p..'job:'..ARGV[2] +redis.call('HSET', jobkey, 'id',ARGV[2],'namespace',ARGV[7],'queue',ARGV[3], + 'status','queued','payload',ARGV[4],'priority',ARGV[5],'attempts','0', + 'claimed_by','','lease_expires_at','','result','','error','', + 'created_at',ARGV[6],'updated_at',ARGV[6]) +redis.call('ZADD', KEYS[1], -tonumber(ARGV[5]), string.format('%020d|%s', seq, ARGV[2])) +redis.call('SADD', p..'queues', ARGV[3]) +redis.call('INCR', p..'cnt:'..ARGV[3]..':queued') +redis.call('PUBLISH', ARGV[8], ARGV[3]) +return ARGV[2] +""" + +_LUA_CLAIM = """ +local p = ARGV[1] +local popped = redis.call('ZPOPMIN', KEYS[1]) +if popped[1] == nil then return nil end +local member = popped[1] +local bar = string.find(member, '|', 1, true) +local jobid = string.sub(member, bar + 1) +local jobkey = p..'job:'..jobid +local attempts = tonumber(redis.call('HGET', jobkey, 'attempts') or '0') + 1 +local lease_at = tonumber(ARGV[5]) + tonumber(ARGV[4]) +redis.call('HSET', jobkey, 'status','running','claimed_by',ARGV[3], + 'lease_expires_at', tostring(lease_at), 'attempts', tostring(attempts), + 'updated_at', ARGV[5]) +redis.call('ZADD', p..'running:'..ARGV[2], lease_at, jobid) +redis.call('DECR', p..'cnt:'..ARGV[2]..':queued') +redis.call('INCR', p..'cnt:'..ARGV[2]..':running') +return redis.call('HGETALL', jobkey) +""" + +_LUA_RENEW = """ +local p = ARGV[1] +local st = redis.call('HGET', KEYS[1], 'status') +local cb = redis.call('HGET', KEYS[1], 'claimed_by') +if st == 'running' and cb == ARGV[2] then + local q = redis.call('HGET', KEYS[1], 'queue') + local lease_at = tonumber(ARGV[4]) + tonumber(ARGV[3]) + redis.call('HSET', KEYS[1], 'lease_expires_at', tostring(lease_at), 'updated_at', ARGV[4]) + redis.call('ZADD', p..'running:'..q, lease_at, ARGV[5]) + return 1 +end +return 0 +""" + +# Mark terminal (+ optionally append one event) — the atomic outbox. ARGV[7] +# empty ⇒ plain mark_* with no event. Already-terminal/missing ⇒ nil (no-op, and +# NO event written): the idempotency guard. +_LUA_TERMINAL = """ +local p = ARGV[1] +local st = redis.call('HGET', KEYS[1], 'status') +if (not st) or st == 'completed' or st == 'failed' then return nil end +local q = redis.call('HGET', KEYS[1], 'queue') +redis.call('HSET', KEYS[1], 'status',ARGV[2],'result',ARGV[3],'error',ARGV[4],'updated_at',ARGV[5]) +redis.call('ZREM', p..'running:'..q, ARGV[6]) +redis.call('DECR', p..'cnt:'..q..':'..st) +redis.call('INCR', p..'cnt:'..q..':'..ARGV[2]) +if ARGV[7] ~= '' then + local eseq = redis.call('INCR', p..'eventseq') + redis.call('HSET', p..'event:'..eseq, 'seq',eseq,'job_id',ARGV[6],'queue',q, + 'event_type',ARGV[7],'detail',ARGV[8],'created_at',ARGV[5]) + redis.call('RPUSH', p..'events', eseq) +end +return redis.call('HGETALL', KEYS[1]) +""" + +_LUA_REQUEUE = """ +local p = ARGV[1] +local st = redis.call('HGET', KEYS[1], 'status') +if (not st) or st == 'completed' or st == 'failed' then return nil end +local q = redis.call('HGET', KEYS[1], 'queue') +local pri = tonumber(redis.call('HGET', KEYS[1], 'priority') or '0') +redis.call('HSET', KEYS[1], 'status','queued','claimed_by','','lease_expires_at','','updated_at',ARGV[2]) +redis.call('ZREM', p..'running:'..q, ARGV[4]) +local seq = redis.call('INCR', p..'seq') +redis.call('ZADD', p..'q:'..q, -pri, string.format('%020d|%s', seq, ARGV[4])) +redis.call('DECR', p..'cnt:'..q..':'..st) +redis.call('INCR', p..'cnt:'..q..':queued') +redis.call('PUBLISH', ARGV[3], q) +return redis.call('HGETALL', KEYS[1]) +""" + +_LUA_RECLAIM = """ +local p = ARGV[1] +local expired = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[3]) +local out = {} +for i, jobid in ipairs(expired) do + local jobkey = p..'job:'..jobid + if redis.call('HGET', jobkey, 'status') == 'running' then + redis.call('HSET', jobkey, 'status','queued','claimed_by','','lease_expires_at','','updated_at',ARGV[3]) + redis.call('ZREM', KEYS[1], jobid) + local seq = redis.call('INCR', p..'seq') + local pri = tonumber(redis.call('HGET', jobkey, 'priority') or '0') + redis.call('ZADD', p..'q:'..ARGV[2], -pri, string.format('%020d|%s', seq, jobid)) + redis.call('DECR', p..'cnt:'..ARGV[2]..':running') + redis.call('INCR', p..'cnt:'..ARGV[2]..':queued') + redis.call('PUBLISH', ARGV[4], ARGV[2]) + out[#out + 1] = jobid + end +end +return out +""" + + +class RedisBackend(StorageBackend): + name = "redis" + + def __init__(self, *, url: str, namespace: str = "") -> None: + super().__init__(url=url, namespace=namespace) + self._r = redis.Redis.from_url(url, decode_responses=True) + self._prefix = f"qw:{self.namespace}:" + self._wake_channel = f"{self._prefix}wake" + self._enqueue = self._r.register_script(_LUA_ENQUEUE) + self._claim = self._r.register_script(_LUA_CLAIM) + self._renew = self._r.register_script(_LUA_RENEW) + self._terminal = self._r.register_script(_LUA_TERMINAL) + self._requeue = self._r.register_script(_LUA_REQUEUE) + self._reclaim = self._r.register_script(_LUA_RECLAIM) + + def _k(self, suffix: str) -> str: + return self._prefix + suffix + + def ensure_schema(self) -> None: + self._r.ping() # nothing to create; just verify connectivity + + def close(self) -> None: + try: + self._r.close() + except Exception: + pass + + # ── enqueue / claim / lease ──────────────────────────────────────────────── + + def enqueue(self, queue, payload, *, job_id=None, priority=0) -> str: + import uuid + + jid = job_id or uuid.uuid4().hex + self._enqueue( + keys=[self._k(f"q:{queue}")], + args=[self._prefix, jid, queue, json.dumps(payload or {}), + str(int(priority)), repr(time.time()), self.namespace, + self._wake_channel], + ) + return jid + + def claim(self, queue, worker, *, lease_s) -> Job | None: + reply = self._claim( + keys=[self._k(f"q:{queue}")], + args=[self._prefix, queue, worker, repr(float(lease_s)), repr(time.time())], + ) + return self._job(reply) + + def renew_lease(self, job_id, worker, *, lease_s) -> bool: + return bool(self._renew( + keys=[self._k(f"job:{job_id}")], + args=[self._prefix, worker, repr(float(lease_s)), repr(time.time()), job_id], + )) + + def reclaim_expired(self, *, queue=None) -> list[str]: + queues = [queue] if queue is not None else list(self._r.smembers(self._k("queues"))) + now = repr(time.time()) + out: list[str] = [] + for q in queues: + ids = self._reclaim( + keys=[self._k(f"running:{q}")], + args=[self._prefix, q, now, self._wake_channel], + ) + out.extend(ids or []) + return out + + def requeue_for_retry(self, job_id) -> Job | None: + reply = self._requeue( + keys=[self._k(f"job:{job_id}")], + args=[self._prefix, repr(time.time()), self._wake_channel, job_id], + ) + return self._job(reply) + + # ── terminal transitions ──────────────────────────────────────────────────── + + def _do_terminal(self, job_id, status, *, result, error, event_type, detail): + reply = self._terminal( + keys=[self._k(f"job:{job_id}")], + args=[self._prefix, status, + json.dumps(result) if result is not None else "", + error or "", repr(time.time()), job_id, + event_type or "", json.dumps(detail or {})], + ) + return self._job(reply) + + def mark_completed(self, job_id, *, result=None) -> Job | None: + return self._do_terminal(job_id, "completed", result=result, error=None, + event_type="", detail=None) + + def mark_failed(self, job_id, *, error=None) -> Job | None: + return self._do_terminal(job_id, "failed", result=None, error=error, + event_type="", detail=None) + + def complete_with_event(self, job_id, event_type, *, result=None, detail=None): + return self._do_terminal(job_id, "completed", result=result, error=None, + event_type=event_type, detail=detail) + + def fail_with_event(self, job_id, event_type, *, error=None, detail=None): + return self._do_terminal(job_id, "failed", result=None, error=error, + event_type=event_type, detail=detail) + + # ── reads ──────────────────────────────────────────────────────────────────── + + def get(self, job_id) -> Job | None: + h = self._r.hgetall(self._k(f"job:{job_id}")) + return self._job_from_dict(h) if h else None + + def counts(self, queue) -> dict[str, int]: + keys = [self._k(f"cnt:{queue}:{s}") for s in + ("queued", "running", "completed", "failed")] + vals = self._r.mget(keys) + return { + s: max(0, int(v or 0)) + for s, v in zip(("queued", "running", "completed", "failed"), vals) + } + + def events(self, *, since=0, limit=1000) -> list[Event]: + seqs = self._r.lrange(self._k("events"), 0, -1) + wanted = [s for s in seqs if int(s) > since][:limit] + out: list[Event] = [] + for s in wanted: + h = self._r.hgetall(self._k(f"event:{s}")) + if not h: + continue + out.append(Event( + seq=int(h["seq"]), job_id=h["job_id"], namespace=self.namespace, + queue=h.get("queue") or None, event_type=h["event_type"], + detail=json.loads(h.get("detail") or "{}"), + created_at=float(h.get("created_at") or 0), + )) + return out + + # ── wake ────────────────────────────────────────────────────────────────────── + + def notify(self, queue) -> None: + self._r.publish(self._wake_channel, queue) + + def subscribe(self, *queues) -> WakeListener: + return _RedisWakeListener(self._r, self._wake_channel, frozenset(queues)) + + # ── heartbeats + control ─────────────────────────────────────────────────────── + + def heartbeat(self, host, queue, *, current_model=None, stale_after_s=30.0) -> None: + key = self._k(f"worker:{queue}:{host}") + self._r.hset(key, mapping={ + "host": host, "queue": queue, "current_model": current_model or "", + "last_seen": repr(time.time()), + }) + self._r.expire(key, max(1, int(stale_after_s))) + + def workers(self, queue) -> list[dict[str, Any]]: + out = [] + for key in self._r.scan_iter(match=self._k(f"worker:{queue}:*"), count=100): + h = self._r.hgetall(key) + if h: + out.append({"host": h.get("host"), "queue": h.get("queue"), + "current_model": h.get("current_model") or None, + "last_seen": float(h.get("last_seen") or 0)}) + return out + + def set_control(self, host, queue, *, desired_state, stop_policy="hard", + requested_by=None) -> None: + self._r.hset(self._k(f"control:{host}:{queue}"), mapping={ + "desired_state": desired_state, "stop_policy": stop_policy, + "requested_by": requested_by or "", "updated_at": repr(time.time()), + }) + + def desired_state(self, host, queue) -> str: + st = self._r.hget(self._k(f"control:{host}:{queue}"), "desired_state") + return "off" if st == "off" else "on" + + # ── helpers ────────────────────────────────────────────────────────────────── + + def _job(self, reply) -> Job | None: + """Script HGETALL replies arrive as a flat ``[k, v, k, v, …]`` list.""" + if not reply: + return None + if isinstance(reply, list): + reply = dict(zip(reply[::2], reply[1::2])) + return self._job_from_dict(reply) + + def _job_from_dict(self, h: dict[str, str]) -> Job: + return Job( + id=h["id"], queue=h["queue"], namespace=h.get("namespace", self.namespace), + status=h["status"], payload=json.loads(h.get("payload") or "{}"), + priority=int(h.get("priority") or 0), attempts=int(h.get("attempts") or 0), + claimed_by=(h.get("claimed_by") or None), + lease_expires_at=(float(h["lease_expires_at"]) if h.get("lease_expires_at") else None), + result=(json.loads(h["result"]) if h.get("result") else None), + error=(h.get("error") or None), + created_at=float(h.get("created_at") or 0), + updated_at=float(h.get("updated_at") or 0), + ) + + +class _RedisWakeListener: + """pub/sub subscription; ``wait`` returns the queue payload if subscribed.""" + + def __init__(self, client, channel: str, queues: frozenset[str]) -> None: + self._client = client + self._channel = channel + self._queues = queues + self._pubsub = None + + def __enter__(self): + self._pubsub = self._client.pubsub(ignore_subscribe_messages=True) + self._pubsub.subscribe(self._channel) + return self + + def __exit__(self, *exc: object) -> None: + if self._pubsub is not None: + try: + self._pubsub.close() + finally: + self._pubsub = None + + def wait(self, timeout: float) -> str | None: + deadline = time.monotonic() + timeout + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + return None + msg = self._pubsub.get_message(timeout=remaining) + if msg is None: + continue + if msg.get("type") != "message": + continue + payload = msg.get("data") + if not self._queues or payload in self._queues: + return payload diff --git a/queue_workflows/config.py b/queue_workflows/config.py index 13a2b6d..d25c9c6 100644 --- a/queue_workflows/config.py +++ b/queue_workflows/config.py @@ -60,6 +60,10 @@ class EngineConfig: db_url_env: str = "AI_LEADS_DB_URL" host_label_env: str = "AI_LEADS_HOST_LABEL" host_priority_env: str = "AI_LEADS_GPU_CONSUMER_PRIORITY" + #: env vars holding the redis / mongodb DSN for those backends (read only when + #: ``db_backend`` selects them). New names (no ai_leads equivalent). + redis_url_env: str = "QUEUE_WORKFLOWS_REDIS_URL" + mongo_url_env: str = "QUEUE_WORKFLOWS_MONGO_URL" # ── value config ────────────────────────────────────────────────────────── #: GPU model ids on the tight per-job video render budget (claim_worker). @@ -71,6 +75,20 @@ class EngineConfig: #: cgroup-attribution container-name prefix (hw_metrics per-container slice). container_prefix: str = "ai_leads-" + # ── storage backend selection (pluggable DB type) ────────────────────────── + #: Which provider the StorageBackend SPI (``queue_workflows.backends``) + #: resolves to: ``"pg"`` (default — Postgres, byte-compat), ``"redis"``, or + #: ``"mongodb"``. The legacy engine modules always use Postgres directly; this + #: only selects the backend the generic durable-queue SPI hands out. Validated + #: against the backend registry by ``configure()``. + db_backend: str = "pg" + #: Logical namespace isolating THIS tenant's jobs on a SHARED redis/mongodb + #: server — every key/collection is scoped by it, so two apps pointed at one + #: server can't claim or read each other's jobs (the multi-tenant data-leakage + #: guard). ``""`` ⇒ the literal namespace ``"default"``. For pg it scopes the + #: SPI rows via a ``namespace`` column. + db_namespace: str = "" + # ── node-module resolver (overrides node_module_package when set) ────────── #: ``Callable[[str], module]`` — resolve a stored ``node_module`` string to #: an imported module exposing ``run(...)``. Default builds from diff --git a/tests/test_backend_contract.py b/tests/test_backend_contract.py new file mode 100644 index 0000000..afad243 --- /dev/null +++ b/tests/test_backend_contract.py @@ -0,0 +1,343 @@ +"""The storage-backend CONTRACT — one parametrized suite every provider must +satisfy identically (pg / redis / mongodb). + +This is the spec (TDD): the adapters in ``queue_workflows/backends/`` are written +to make these pass. Each backend is exercised against a real server; a backend +whose server isn't reachable (env unset / down) is SKIPPED, not failed, so the +suite runs anywhere — but CI/release must show all three green (see +``docs/storage_backends.md``). + +Servers come from env (dockerized in dev): + * pg — ``QUEUE_WORKFLOWS_TEST_DB_URL`` (shared with the engine suite) + * redis — ``QUEUE_WORKFLOWS_TEST_REDIS_URL`` + * mongodb — ``QUEUE_WORKFLOWS_TEST_MONGO_URL`` (replica set: txns + change streams) + +Each test gets a FRESH random namespace, so tests never see each other's jobs +(and the cross-namespace test doubles as the data-leakage guard). +""" + +from __future__ import annotations + +import os +import time +import uuid + +import pytest + +from queue_workflows.backends import build_backend +from queue_workflows.backends.base import ( + STATUS_COMPLETED, + STATUS_FAILED, + STATUS_QUEUED, + STATUS_RUNNING, +) + +_URLS = { + "pg": os.environ.get("QUEUE_WORKFLOWS_TEST_DB_URL"), + "redis": os.environ.get("QUEUE_WORKFLOWS_TEST_REDIS_URL"), + "mongodb": os.environ.get("QUEUE_WORKFLOWS_TEST_MONGO_URL"), +} + +# Probe each backend once per session; cache (ok, reason) so we don't reconnect +# for every parametrized test. +_REACHABLE: dict[str, tuple[bool, str]] = {} + + +def _reachable(name: str) -> tuple[bool, str]: + if name in _REACHABLE: + return _REACHABLE[name] + url = _URLS.get(name) + if not url: + res = (False, f"{name}: set QUEUE_WORKFLOWS_TEST_{name.upper()}_URL") + else: + try: + be = build_backend(name, url=url, namespace="probe") + be.ensure_schema() + be.close() + res = (True, "") + except Exception as exc: # server down / driver missing + res = (False, f"{name} unreachable: {type(exc).__name__}: {exc}") + _REACHABLE[name] = res + return res + + +@pytest.fixture(params=["pg", "redis", "mongodb"]) +def backend(request): + name = request.param + ok, why = _reachable(name) + if not ok: + pytest.skip(why) + ns = f"t_{uuid.uuid4().hex[:12]}" + be = build_backend(name, url=_URLS[name], namespace=ns) + be.ensure_schema() + yield be + be.close() + + +# ── enqueue / get ───────────────────────────────────────────────────────────── + + +def test_enqueue_then_get(backend): + jid = backend.enqueue("cpu", {"x": 1}, priority=0) + assert isinstance(jid, str) and jid + job = backend.get(jid) + assert job is not None + assert job["status"] == STATUS_QUEUED + assert job["queue"] == "cpu" + assert job["payload"] == {"x": 1} + assert job["attempts"] == 0 + assert job["namespace"] == backend.namespace + + +def test_get_missing_returns_none(backend): + assert backend.get("does-not-exist") is None + + +# ── claim: exactly-once under contention ─────────────────────────────────────── + + +def test_claim_then_second_claim_is_none(backend): + backend.enqueue("cpu", {"n": 1}) + first = backend.claim("cpu", "w1", lease_s=30) + assert first is not None + assert first["status"] == STATUS_RUNNING + assert first["claimed_by"] == "w1" + assert first["attempts"] == 1 + assert backend.claim("cpu", "w2", lease_s=30) is None # nothing left + + +def test_claim_returns_distinct_jobs_never_double(backend): + ids = {backend.enqueue("cpu", {"i": i}) for i in range(5)} + claimed = [] + for _ in range(5): + job = backend.claim("cpu", "w", lease_s=30) + assert job is not None + claimed.append(job["id"]) + assert backend.claim("cpu", "w", lease_s=30) is None + assert set(claimed) == ids # every job claimed exactly once, no repeats + assert len(claimed) == len(set(claimed)) + + +def test_claim_respects_priority(backend): + backend.enqueue("cpu", {"p": "low"}, priority=0) + backend.enqueue("cpu", {"p": "high"}, priority=10) + job = backend.claim("cpu", "w", lease_s=30) + assert job["payload"]["p"] == "high" # higher priority first + + +def test_claim_isolated_per_queue(backend): + backend.enqueue("cpu", {"q": "cpu"}) + assert backend.claim("gpu", "w", lease_s=30) is None # wrong queue + + +def test_claim_exactly_once_under_thread_contention(backend): + """The keystone, proven under REAL contention: N jobs, many threads racing + to claim — every job is claimed exactly once, none twice (SKIP-LOCKED-equiv: + PG ``FOR UPDATE SKIP LOCKED`` / Redis ``ZPOPMIN`` in Lua / Mongo + ``find_one_and_update``).""" + import threading + + n = 40 + ids = {backend.enqueue("cpu", {"i": i}) for i in range(n)} + claimed: list[str] = [] + lock = threading.Lock() + + def drain(worker: str) -> None: + while True: + job = backend.claim("cpu", worker, lease_s=30) + if job is None: + return + with lock: + claimed.append(job["id"]) + + threads = [threading.Thread(target=drain, args=(f"w{t}",)) for t in range(8)] + for t in threads: + t.start() + for t in threads: + t.join() + assert sorted(claimed) == sorted(ids) # every job claimed + assert len(claimed) == len(set(claimed)) # and never twice + + +# ── lease renew + reclaim ────────────────────────────────────────────────────── + + +def test_renew_lease_only_by_owner(backend): + backend.enqueue("cpu", {}) + job = backend.claim("cpu", "w1", lease_s=30) + assert backend.renew_lease(job["id"], "w1", lease_s=60) is True + assert backend.renew_lease(job["id"], "someone-else", lease_s=60) is False + + +def test_reclaim_expired_requeues(backend): + backend.enqueue("cpu", {}) + job = backend.claim("cpu", "w1", lease_s=0) # already expired + assert job["status"] == STATUS_RUNNING + reclaimed = backend.reclaim_expired(queue="cpu") + assert job["id"] in reclaimed + again = backend.get(job["id"]) + assert again["status"] == STATUS_QUEUED + assert again["claimed_by"] in (None, "") + # …and it's claimable again + assert backend.claim("cpu", "w2", lease_s=30)["id"] == job["id"] + + +def test_reclaim_leaves_live_leases_alone(backend): + backend.enqueue("cpu", {}) + job = backend.claim("cpu", "w1", lease_s=300) # healthy lease + assert backend.reclaim_expired(queue="cpu") == [] + assert backend.get(job["id"])["status"] == STATUS_RUNNING + + +# ── idempotent terminals ─────────────────────────────────────────────────────── + + +def test_mark_completed_idempotent(backend): + backend.enqueue("cpu", {}) + job = backend.claim("cpu", "w", lease_s=30) + done = backend.mark_completed(job["id"], result={"ok": True}) + assert done is not None and done["status"] == STATUS_COMPLETED + assert done["result"] == {"ok": True} + assert backend.mark_completed(job["id"], result={"ok": False}) is None # no-op + assert backend.get(job["id"])["result"] == {"ok": True} # not clobbered + + +def test_mark_failed_idempotent(backend): + backend.enqueue("cpu", {}) + job = backend.claim("cpu", "w", lease_s=30) + failed = backend.mark_failed(job["id"], error="boom") + assert failed is not None and failed["status"] == STATUS_FAILED + assert backend.mark_failed(job["id"], error="again") is None + assert backend.mark_completed(job["id"]) is None # can't resurrect terminal + + +# ── atomic outbox (the keystone) ─────────────────────────────────────────────── + + +def test_complete_with_event_atomic_and_idempotent(backend): + backend.enqueue("cpu", {}) + job = backend.claim("cpu", "w", lease_s=30) + out = backend.complete_with_event(job["id"], "completed", result={"r": 1}) + assert out is not None and out["status"] == STATUS_COMPLETED + evs = backend.events() + assert len([e for e in evs if e["job_id"] == job["id"]]) == 1 + ev = [e for e in evs if e["job_id"] == job["id"]][0] + assert ev["event_type"] == "completed" + # second delivery: no transition, and crucially NO duplicate event + assert backend.complete_with_event(job["id"], "completed") is None + evs2 = backend.events() + assert len([e for e in evs2 if e["job_id"] == job["id"]]) == 1 + + +def test_fail_with_event_atomic(backend): + backend.enqueue("cpu", {}) + job = backend.claim("cpu", "w", lease_s=30) + out = backend.fail_with_event(job["id"], "failed", error="x", detail={"code": 75}) + assert out["status"] == STATUS_FAILED + evs = [e for e in backend.events() if e["job_id"] == job["id"]] + assert len(evs) == 1 and evs[0]["event_type"] == "failed" + assert evs[0]["detail"].get("code") == 75 + + +def test_events_are_ordered_and_filterable_by_seq(backend): + j1 = backend.enqueue("cpu", {}) + j2 = backend.enqueue("cpu", {}) + backend.fail_with_event(backend.claim("cpu", "w", lease_s=30)["id"], "failed") + first_seq = backend.events()[0]["seq"] + backend.complete_with_event(backend.claim("cpu", "w", lease_s=30)["id"], "completed") + after = backend.events(since=first_seq) + assert all(e["seq"] > first_seq for e in after) + assert len(after) == 1 + + +# ── watchdog re-queue (retry) ────────────────────────────────────────────────── + + +def test_requeue_for_retry_keeps_attempts_no_event(backend): + backend.enqueue("cpu", {}) + job = backend.claim("cpu", "w1", lease_s=300) + assert job["attempts"] == 1 + rq = backend.requeue_for_retry(job["id"]) + assert rq is not None and rq["status"] == STATUS_QUEUED + assert backend.events() == [] or all( + e["job_id"] != job["id"] for e in backend.events() + ) + # claimable again; attempts increments on the re-claim + again = backend.claim("cpu", "w2", lease_s=30) + assert again["id"] == job["id"] + assert again["attempts"] == 2 + + +def test_requeue_terminal_returns_none(backend): + backend.enqueue("cpu", {}) + job = backend.claim("cpu", "w", lease_s=30) + backend.mark_completed(job["id"]) + assert backend.requeue_for_retry(job["id"]) is None + + +# ── counts ────────────────────────────────────────────────────────────────────── + + +def test_counts(backend): + backend.enqueue("cpu", {}) + backend.enqueue("cpu", {}) + backend.enqueue("cpu", {}) + backend.mark_completed(backend.claim("cpu", "w", lease_s=30)["id"]) + c = backend.counts("cpu") + assert c["queued"] == 2 + assert c["running"] == 0 + assert c["completed"] == 1 + assert c["failed"] == 0 + + +# ── wake (best-effort NOTIFY / pub-sub / change-stream) ───────────────────────── + + +def test_wake_on_enqueue(backend): + with backend.subscribe("cpu") as sub: + time.sleep(0.2) # let the subscription establish (pub/sub has no backlog) + backend.enqueue("cpu", {"wake": 1}) + got = sub.wait(5.0) + assert got == "cpu" + + +# ── heartbeats + operator ON/OFF control ──────────────────────────────────────── + + +def test_heartbeat_and_workers(backend): + backend.heartbeat("hostA", "gpu", current_model="m1") + workers = backend.workers("gpu") + assert any(w.get("host") == "hostA" for w in workers) + + +def test_worker_control_default_on_then_off_then_on(backend): + assert backend.desired_state("hostA", "gpu") == "on" # absent ⇒ ON + backend.set_control("hostA", "gpu", desired_state="off", requested_by="ops") + assert backend.desired_state("hostA", "gpu") == "off" + backend.set_control("hostA", "gpu", desired_state="on") + assert backend.desired_state("hostA", "gpu") == "on" + # per-queue isolation: turning gpu off must not touch cpu + backend.set_control("hostA", "gpu", desired_state="off") + assert backend.desired_state("hostA", "cpu") == "on" + + +# ── multi-tenant isolation (the data-leakage guard) ───────────────────────────── + + +def test_namespace_isolation(backend): + """A second backend on a DIFFERENT namespace, SAME server + queue, must not + see / claim / count this namespace's jobs.""" + other = build_backend( + backend.name, url=backend.url, namespace=f"other_{uuid.uuid4().hex[:8]}" + ) + other.ensure_schema() + try: + jid = backend.enqueue("cpu", {"secret": 1}) + assert other.get(jid) is None + assert other.claim("cpu", "intruder", lease_s=30) is None + assert other.counts("cpu")["queued"] == 0 + # and the reverse: other's job is invisible here + ojid = other.enqueue("cpu", {}) + assert backend.get(ojid) is None + finally: + other.close()