Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions AGENTS.md

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [0.3.0] — 2026-05-27

### Added
- **Pluggable storage backends** — a `StorageBackend` SPI
(`queue_workflows.backends`) makes the durable-queue store selectable via
`configure(db_backend="pg"|"redis"|"mongodb")`, one provider per file. `pg`
(default) is byte-compatible and unchanged; `redis` (atomic claim/terminal via
Lua + pub/sub wake) and `mongodb` (`find_one_and_update` + multi-doc-txn outbox
+ change-stream wake, replica set) reproduce the same contract — claim
exactly-once, lease/reclaim, idempotent terminals, the atomic outbox, and
per-namespace tenant isolation — pinned by a parametrized contract suite run
against all three live servers. Optional drivers: `queue_workflows[redis]` /
`[mongodb]`. See `docs/storage_backends.md`.
- Operator worker ON/OFF control plane: a `worker_controls` table + a
per-`(host, queue)` `WorkerControlWatcher` that hard-stops (kill in-flight +
free RAM/VRAM) or parks a worker on command (migration 0012). See
Expand Down
52 changes: 44 additions & 8 deletions CLAUDE.md

Large diffs are not rendered by default.

30 changes: 29 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ rationale.
detector (last-resort recovery from a GPU hardware hang).
- [`docs/worker_control.md`](docs/worker_control.md) — the operator ON/OFF
control plane (hard-stop vs park a `(host, queue)` worker).
- [`docs/storage_backends.md`](docs/storage_backends.md) — the pluggable
`StorageBackend` SPI (`pg` / `redis` / `mongodb`): the contract, the
capability matrix, and the integration boundary.

## Quick start

Expand Down Expand Up @@ -208,6 +211,29 @@ worker absent from `worker_controls` is treated as ON. See
[`docs/worker_control.md`](docs/worker_control.md) for the full design (the
`os._exit(79)` hard-stop contract and the extensible stop-policy seam).

## Pluggable storage backends (`pg` · `redis` · `mongodb`)

The queue store is selectable (since v0.3.0). Postgres is the default and the
reference path; `redis` and `mongodb` are **opt-in** providers that reproduce the
same durable-queue contract — claim exactly-once, lease/reclaim, idempotent
terminals, an **atomic outbox**, and per-namespace tenant isolation.

```python
import queue_workflows
queue_workflows.configure(db_backend="redis") # or "mongodb" / "pg" (default)
from queue_workflows.backends import get_backend
be = get_backend()
job_id = be.enqueue("cpu", {"task": "render"})
job = be.claim("cpu", worker="box-1", lease_s=30)
be.complete_with_event(job["id"], "completed", result={"ok": True})
```

`pip install 'queue_workflows[redis]'` / `'[mongodb]'` for the optional drivers
(mongo needs a replica set). This is **additive** — it does not move the existing
orchestrator/worker off Postgres. See
[`docs/storage_backends.md`](docs/storage_backends.md) for the design, the
capability matrix (and its caveats), and the integration boundary.

## Tests

```
Expand All @@ -217,4 +243,6 @@ QUEUE_WORKFLOWS_TEST_DB_URL=postgresql://user:pw@host:port/queue_workflows_test
```

The suite forces a `*_test` DB and applies the engine migration chain only. See
`tests/conftest.py`.
`tests/conftest.py`. The multi-backend contract suite additionally reads
`QUEUE_WORKFLOWS_TEST_REDIS_URL` / `QUEUE_WORKFLOWS_TEST_MONGO_URL` (each backend
skips if its server is unset/unreachable).
123 changes: 123 additions & 0 deletions docs/storage_backends.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Pluggable storage backends (`pg` · `redis` · `mongodb`)

`queue_workflows` is, by design, a **Postgres-as-queue** engine — its claim loop,
lease/reclaim, dispatch outbox and wake are written directly in SQL. That stays
the reference path. This document describes the **`StorageBackend` SPI**, an
additive seam (since v0.3.0) that makes the *queue storage* selectable so the
same durable-queue semantics can run on Redis or MongoDB.

```python
import queue_workflows
queue_workflows.configure(db_backend="redis") # or "mongodb", or "pg" (default)
from queue_workflows.backends import get_backend
be = get_backend() # bound to config.db_namespace
jid = be.enqueue("cpu", {"task": "render"})
job = be.claim("cpu", worker="box-1", lease_s=30)
be.complete_with_event(job["id"], "completed", result={"ok": True})
```

Selecting `pg` (the default) imports **nothing** new and changes nothing — the
seam is opt-in and the legacy engine is untouched.

## What the SPI is — and is not

The port (`queue_workflows/backends/base.py`, `StorageBackend`) is a **generic
durable work-queue with a transactional outbox**:

- `enqueue` / `claim` (exactly-once) / `renew_lease` / `reclaim_expired` / `requeue_for_retry`
- idempotent terminals (`mark_completed` / `mark_failed`) and the **atomic outbox**
(`complete_with_event` / `fail_with_event` — go terminal *and* append the event,
both-or-neither)
- best-effort wake (`notify` / `subscribe`), `heartbeat` / `workers`, and the
operator `set_control` / `desired_state` (ON/OFF, default-ON)

It is deliberately **not leaky**: no method takes or returns a driver handle
(psycopg cursor, redis pipeline, pymongo session). Each backend is **bound to one
namespace** and scopes every key/row/collection by it, so two tenants on one
server are isolated.

> **Integration boundary (read this).** The SPI is **additive**. Selecting
> `redis`/`mongodb` does **not** re-home the existing orchestrator / claim-worker
> / dispatcher — those still run on Postgres. v0.3.0 ships the SPI + three
> backends that pass an identical contract suite; wiring the DAG orchestrator
> end-to-end onto a non-PG backend is a later milestone. Use the SPI today as a
> standalone pluggable durable queue.

## How each backend reproduces the contract

| Guarantee | `pg` | `redis` | `mongodb` |
|---|---|---|---|
| Claim exactly-once | `FOR UPDATE SKIP LOCKED` | `ZPOPMIN` inside a **Lua** script | `find_one_and_update` |
| Priority + FIFO | `ORDER BY priority DESC, created_at` | sorted set, score `-priority`, FIFO tiebreak | sort `priority desc, seq asc` |
| Lease / reclaim | lease column + `UPDATE … lease < now()` | per-queue running ZSET scored by expiry | `lease_expires_at < now` sweep |
| Idempotent terminal | `UPDATE … WHERE status NOT IN (terminal) RETURNING *` | Lua status guard | `find_one_and_update` status guard |
| **Atomic outbox** | one **transaction** | one **Lua script** | one **multi-doc transaction** |
| Wake | `LISTEN` / `pg_notify` (in-txn) | **pub/sub** (fire-and-forget) | **change stream** on a capped coll. |
| Namespace isolation | `namespace` column | key prefix `qw:<ns>:` | one **database** per namespace |

### Caveats (be honest about the weakenings)

- **Redis** has no cross-key ACID transaction; atomicity comes from **Lua**
(single server-side step), so this targets a **single Redis instance**, not
Cluster (Cluster needs all keys in one hash slot). The wake is pub/sub —
fire-and-forget — so a subscriber that is down misses it; the worker's safety
poll covers that, exactly as it does behind PG `LISTEN`.
- **MongoDB** transactions *and* change streams require a **replica set** (a
single-node RS is fine); on a standalone `mongod`, `complete_with_event` /
`fail_with_event` and the wake will fail loudly. `ensure_schema()` pings on
connect.
- **pg** computes `counts()` from the live `status`, so it can never drift;
redis keeps maintained counters (decremented against the *prior* status — see
the audit below); mongo counts live documents.

## Configuration

| `configure(...)` key | env (default) | meaning |
|---|---|---|
| `db_backend` | — | `"pg"` (default) / `"redis"` / `"mongodb"` (aliases `postgres`, `mongo`) |
| `db_namespace` | — | tenant scope on a shared server (`""` ⇒ `"default"`) |
| — | `QUEUE_WORKFLOWS_REDIS_URL` | redis DSN (`redis_url_env` renames it) |
| — | `QUEUE_WORKFLOWS_MONGO_URL` | mongo DSN, incl. `?replicaSet=…` / `directConnection=true` |

Install the optional driver: `pip install 'queue_workflows[redis]'` or
`'queue_workflows[mongodb]'`. Selecting a backend whose driver is missing raises
a clear `ImportError` naming the extra.

## Tests

`tests/test_backend_contract.py` is one parametrized suite run against **all three
live servers** (a backend whose server is unreachable is skipped, not failed):

```bash
docker run -d --name qw_pg -e POSTGRES_PASSWORD=postgres -p 5433:5432 postgres:16
docker run -d --name qw_redis -p 6380:6379 redis:7
docker run -d --name qw_mongo -p 27018:27017 mongo:7 --replSet rs0 --bind_ip_all
docker exec qw_mongo mongosh --quiet --eval 'rs.initiate()'

export QUEUE_WORKFLOWS_TEST_DB_URL="postgresql://postgres:postgres@localhost:5433/queue_workflows_test"
export QUEUE_WORKFLOWS_TEST_REDIS_URL="redis://localhost:6380/0"
export QUEUE_WORKFLOWS_TEST_MONGO_URL="mongodb://localhost:27018/?directConnection=true"
python -m pytest tests/test_backend_contract.py
```

The suite pins claim-exactly-once (incl. an 8-thread contention stress test),
lease/renew/reclaim, idempotent terminals, the atomic outbox (both-or-neither +
no duplicate on re-delivery), the wake, heartbeat/control, and cross-namespace
isolation.

## Audit (v0.3.0)

The SPI was audited across design / regressions / safety / internal data leakage:

- **A1 (safety, fixed).** Redis `counts()` used maintained counters that
decremented `:running` unconditionally; a terminal/requeue applied to a job
that wasn't `running` would drift them. Fixed: the Lua now decrements the
job's **prior** status. (pg/mongo derive counts from live status — immune.)
- **A2 (safety, added).** Added a multi-threaded contention test that proves
claim-exactly-once on all three backends, not just sequentially.
- **A3 (design/leakage).** The SPI is additive (see *Integration boundary*); it
does not silently re-home the engine. No driver objects appear in the port, so
PG internals can't leak into redis/mongo call sites. Cross-namespace
claim/read/count/wake are isolated and tested.
- **Regressions:** none — the full engine suite (415 tests) stays green; the new
drivers are imported lazily so a `pg`-only deploy needs neither installed.
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ dependencies = [

[project.optional-dependencies]
metrics = ["psutil"] # hw_metrics CPU/RAM probe (GPU probe shells out, no dep)
test = ["pytest", "pytest-cov"]
redis = ["redis>=5"] # the 'redis' StorageBackend (queue_workflows.backends.redis)
mongodb = ["pymongo>=4.4"] # the 'mongodb' StorageBackend (needs a replica set)
test = ["pytest", "pytest-cov", "redis>=5", "pymongo>=4.4"]

[project.scripts]
queue-claim-worker = "queue_workflows.claim_worker:main" # generic CLIs (used standalone / other projects)
Expand Down
10 changes: 10 additions & 0 deletions queue_workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def configure(
container_prefix: str | None = None,
ingest_queues: frozenset[str] | None = None,
ingest_default_budget_s: int | None = None,
db_backend: str | None = None,
db_namespace: str | None = None,
) -> EngineConfig:
"""Set engine configuration values. Only the passed keyword args are
mutated; the rest keep their (ai_leads-byte-compatible) defaults. Returns
Expand Down Expand Up @@ -101,6 +103,14 @@ def configure(
cfg.ingest_queues = iq
if ingest_default_budget_s is not None:
cfg.ingest_default_budget_s = int(ingest_default_budget_s)
if db_backend is not None:
# Validate against the backend registry (source of truth), imported
# lazily so the package root stays import-light and config a leaf.
from queue_workflows.backends import canonical_backend_name

cfg.db_backend = canonical_backend_name(db_backend)
if db_namespace is not None:
cfg.db_namespace = str(db_namespace)
return cfg


Expand Down
149 changes: 149 additions & 0 deletions queue_workflows/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""Storage-backend registry — resolve ``config.db_backend`` to a concrete
:class:`~queue_workflows.backends.base.StorageBackend`.

Three providers, one file each (split per the request): ``postgres`` (the
reference/default), ``redis``, ``mongodb``. Provider modules are imported
**lazily** — selecting ``pg`` never imports ``redis``/``pymongo``, so those stay
*optional* dependencies (``pip install 'queue_workflows[redis]'`` /
``[mongodb]``) and a pg-only deploy needs neither installed.

Aliases collapse to a canonical name (``postgres``/``postgresql`` → ``pg``;
``mongo`` → ``mongodb``) so ``configure(db_backend=...)`` is forgiving but the
rest of the code only ever sees the canonical form.
"""

from __future__ import annotations

import os
import threading

from queue_workflows.backends.base import (
Event,
Job,
StorageBackend,
WakeListener,
)
from queue_workflows.config import get_config

__all__ = [
"StorageBackend",
"Job",
"Event",
"WakeListener",
"canonical_backend_name",
"is_known_backend",
"known_backends",
"build_backend",
"get_backend",
"close_all",
]

#: alias → canonical name. The canonical set is the source of truth ``configure``
#: validates against.
_ALIASES = {
"pg": "pg",
"postgres": "pg",
"postgresql": "pg",
"redis": "redis",
"mongo": "mongodb",
"mongodb": "mongodb",
}

# Dotted path to each provider class, imported only when first selected.
_PROVIDERS = {
"pg": ("queue_workflows.backends.postgres", "PostgresBackend"),
"redis": ("queue_workflows.backends.redis", "RedisBackend"),
"mongodb": ("queue_workflows.backends.mongodb", "MongoBackend"),
}

_instances: dict[tuple[str, str, str], StorageBackend] = {}
_lock = threading.RLock()


def known_backends() -> frozenset[str]:
"""The canonical backend names (``{"pg", "redis", "mongodb"}``)."""
return frozenset(_PROVIDERS)


def canonical_backend_name(name: str) -> str:
"""Normalize ``name`` to its canonical form, or raise ``ValueError`` listing
the valid names. Used by ``configure(db_backend=...)`` to fail fast."""
key = (name or "").strip().lower()
if key not in _ALIASES:
raise ValueError(
f"unknown db_backend {name!r}; valid: "
f"{sorted(set(_ALIASES))} (canonical {sorted(known_backends())})"
)
return _ALIASES[key]


def is_known_backend(name: str) -> bool:
return (name or "").strip().lower() in _ALIASES


def _load_provider(canonical: str) -> type[StorageBackend]:
module_path, cls_name = _PROVIDERS[canonical]
import importlib

try:
module = importlib.import_module(module_path)
except ImportError as exc: # missing optional driver
extra = {"redis": "redis", "mongodb": "mongodb"}.get(canonical, canonical)
raise ImportError(
f"the {canonical!r} backend needs its driver: "
f"pip install 'queue_workflows[{extra}]' ({exc})"
) from exc
return getattr(module, cls_name)


def build_backend(name: str, *, url: str, namespace: str = "") -> StorageBackend:
"""Construct a backend explicitly (no config / no caching) — the seam tests
use this to point each provider at its dockerized server + a namespace."""
cls = _load_provider(canonical_backend_name(name))
return cls(url=url, namespace=namespace)


def _url_for(canonical: str) -> str:
cfg = get_config()
if canonical == "pg":
from queue_workflows.db import db_url

return db_url()
env_name = cfg.redis_url_env if canonical == "redis" else cfg.mongo_url_env
url = os.environ.get(env_name)
if not url:
raise RuntimeError(
f"db_backend={canonical!r} but {env_name} is not set; "
f"export the {canonical} DSN there (or pass a different env via "
f"configure({'redis_url_env' if canonical == 'redis' else 'mongo_url_env'}=...))."
)
return url


def get_backend(*, namespace: str | None = None) -> StorageBackend:
"""Return the process-wide backend for the configured ``db_backend`` +
namespace, building (and caching) it on first use. Cached per
``(backend, namespace, url)`` so repeated calls reuse one client/pool."""
cfg = get_config()
canonical = canonical_backend_name(cfg.db_backend)
ns = namespace if namespace is not None else cfg.db_namespace
url = _url_for(canonical)
key = (canonical, ns or "", url)
with _lock:
be = _instances.get(key)
if be is None:
be = _load_provider(canonical)(url=url, namespace=ns or "")
be.ensure_schema()
_instances[key] = be
return be


def close_all() -> None:
"""Close + drop every cached backend (orchestrator shutdown / test teardown)."""
with _lock:
for be in _instances.values():
try:
be.close()
except Exception: # teardown best-effort
pass
_instances.clear()
Loading