Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Every terminal write (`delete_with_lease`, `mark_pending_with_lease`) filters on

`TestOutboxBroker` (in `testing.py`) swaps in a `FakeOutboxClient` (in-memory list of `_FakeRow` dicts). Two dispatch modes:

- **Sync (default, `run_loops=False`)**: `broker.publish` synchronously routes through `OutboxSubscriber.dispatch_one` — matches the FastStream test-broker idiom (`TestKafkaBroker` / `TestRabbitBroker`). The handler runs before `publish` returns; no background loops. `broker.publish_batch`, `cancel_timer`, and `fetch_unprocessed` are also patched to operate on the fake client (the `session` argument is ignored). Future-dated rows (`activate_in`/`activate_at`) intentionally are *not* dispatched — they sit in the fake client, matching production where they wait for the gate.
- **Sync (default, `run_loops=False`)**: `broker.publish` synchronously routes through `OutboxSubscriber.dispatch_one` — matches the FastStream test-broker idiom (`TestKafkaBroker` / `TestRabbitBroker`). The handler runs before `publish` returns; no background loops. `broker.publish_batch`, `cancel_timer`, and `fetch_unprocessed` are also patched to operate on the fake client (the `session` argument is ignored). Future-dated rows (`activate_in`/`activate_at`) fire **immediately** in sync mode — sync dispatch ignores `next_attempt_at`. This trades production parity for test ergonomics: tests can assert handler effects without time travel. `next_attempt_at` is still recorded on the fake row for inspection. Use `run_loops=True` if you need scheduled delivery to actually wait.
- **Loop (`run_loops=True`)**: spins up the real `_fetch_loop` / `_worker_loop` against the fake client. Required for tests that exercise retry rescheduling, lease-expiry reclaim, fetch-loop error recovery, or scheduled delivery firing. Subscribers without registered handlers are skipped in `_fake_start` (mirrors `OutboxSubscriber.start`'s `if not self.calls: return`).

`OutboxSubscriber.dispatch_one(row)` is the public per-row dispatch entry point. The worker loop calls it; the test broker calls it directly. Caller must have already acquired the row's lease.
Expand Down
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,37 @@ await broker.cancel_timer(queue="orders", timer_id="order-confirm-42", session=s

**Latency floor:** firing latency is bounded by the subscriber's `max_fetch_interval` (default 10s) after `next_attempt_at` elapses. Lower it for sub-10s precision; sub-second precision is not a goal of this broker.

*In tests using `TestOutboxBroker` (default sync mode), `activate_in` / `activate_at` are ignored and timers fire immediately — see [Testing](#testing).*

## Testing

`TestOutboxBroker` (in `faststream_outbox.testing`) swaps the SQLAlchemy-backed client for an in-memory fake so unit tests don't need Postgres. By default it dispatches handlers **synchronously inside `publish`** — matching `TestKafkaBroker` / `TestRabbitBroker`. No `_wait_until`, no `sleep`.

```python
from faststream_outbox.testing import TestOutboxBroker

async def test_handler() -> None:
received: list[int] = []

@broker.subscriber("orders")
async def handle(order_id: int) -> None:
received.append(order_id)

async with TestOutboxBroker(broker):
await broker.publish(1, queue="orders")
# Handler has already run.
assert received == [1]
```

Sync mode ignores `activate_in` / `activate_at` — **timers fire immediately**, so straight-line tests work for scheduled publishes without waiting on wall clock. The schedule is still recorded on the fake row (`broker.fake_client.rows[0].next_attempt_at`) if a test needs to assert on it. `cancel_timer` still works for queues without a registered handler.

For tests that need real polling semantics — retry rescheduling, lease expiry / reclaim, `_fetch_loop` error recovery, or honoring `activate_in` delays — opt in to the loop-driven mode:

```python
async with TestOutboxBroker(broker, run_loops=True):
... # use feed() / _wait_until to drive the real loops
```

## Schema validation

Schema validation is opt-in:
Expand Down
22 changes: 11 additions & 11 deletions faststream_outbox/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
``TestOutboxBroker`` wraps an ``OutboxBroker`` and swaps in a ``FakeOutboxClient``
backed by a list of dicts. Defaults to **sync dispatch**: ``await broker.publish(...)``
finds the matching subscriber and awaits its consume pipeline before returning, the
same model as ``TestKafkaBroker`` / ``TestRabbitBroker``. Pass ``run_loops=True`` to
restore the loop-driven behavior — the real ``_fetch_loop`` / ``_worker_loop`` run
against the in-memory client; required for tests that exercise retry rescheduling,
lease expiry, or fetch-loop error recovery. ``feed()`` simulates a row insert.
same model as ``TestKafkaBroker`` / ``TestRabbitBroker``. Timers fire immediately in
sync mode — ``activate_in`` / ``activate_at`` are recorded on the fake row but not
honored. Pass ``run_loops=True`` to restore the loop-driven behavior — the real
``_fetch_loop`` / ``_worker_loop`` run against the in-memory client; required for
tests that exercise retry rescheduling, lease expiry, fetch-loop error recovery,
or scheduled delivery actually waiting. ``feed()`` simulates a row insert.
"""

import datetime as _dt
Expand Down Expand Up @@ -264,11 +266,10 @@ async def fake_publish( # noqa: PLR0913
next_attempt_at=next_at,
timer_id=timer_id,
)
# Sync dispatch only when:
# - loop mode is off (loops would re-dispatch the row otherwise),
# - the insert wasn't a timer-dedup no-op,
# - the row is eligible now (future-dated rows wait for their gate, same as production).
if not run_loops and row_id is not None and (next_at is None or next_at <= _utcnow()):
# Sync dispatch ignores next_attempt_at — timers fire immediately in test mode.
# Skip only when loop mode is on (loops would re-dispatch) or the insert was a
# timer-dedup no-op.
if not run_loops and row_id is not None:
await _sync_dispatch(fake_client, broker, queue, row_id)
return row_id

Expand Down Expand Up @@ -301,7 +302,6 @@ async def fake_publish_batch(
next_at = _utcnow() + activate_in
elif activate_at is not None:
next_at = activate_at
eligible_now = next_at is None or next_at <= _utcnow()
for body in bodies:
payload, hdrs = _encode_payload(body, headers=headers, serializer=serializer)
row_id = fake_client.feed(
Expand All @@ -310,7 +310,7 @@ async def fake_publish_batch(
headers=hdrs,
next_attempt_at=next_at,
)
if not run_loops and row_id is not None and eligible_now:
if not run_loops and row_id is not None:
await _sync_dispatch(fake_client, broker, queue, row_id)

return fake_publish_batch
Expand Down
34 changes: 17 additions & 17 deletions tests/test_fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,14 @@ async def test_fake_broker_publish_with_timer_id_dedups() -> None:
assert len(test_broker.fake_client.rows) == 1


async def test_fake_broker_publish_with_activate_in_skips_sync_dispatch() -> None:
"""Future-dated rows are not dispatched in sync mode — they wait for the gate (production parity)."""
async def test_fake_broker_publish_with_activate_in_dispatches_immediately() -> None:
"""Sync mode ignores activate_in — timers fire immediately."""
broker = _make_broker()
received: list[str] = []

@broker.subscriber("orders")
async def handle(body: str) -> None:
received.append(body) # pragma: no cover # handler must not fire for future-dated rows
received.append(body)

test_broker = TestOutboxBroker(broker)
async with test_broker:
Expand All @@ -182,34 +182,34 @@ async def handle(body: str) -> None:
activate_in=_dt.timedelta(seconds=5),
)

assert received == []
assert len(test_broker.fake_client.rows) == 1
assert received == ["delayed"]
assert test_broker.fake_client.rows == []


async def test_fake_broker_publish_with_activate_at_skips_sync_dispatch() -> None:
async def test_fake_broker_publish_with_activate_at_dispatches_immediately() -> None:
broker = _make_broker()
received: list[str] = []

@broker.subscriber("orders")
async def handle(body: str) -> None:
received.append(body) # pragma: no cover
received.append(body)

test_broker = TestOutboxBroker(broker)
async with test_broker:
future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(seconds=5)
await broker.publish("at-future", queue="orders", activate_at=future) # ty: ignore[missing-argument]

assert received == []
assert len(test_broker.fake_client.rows) == 1
assert received == ["at-future"]
assert test_broker.fake_client.rows == []


async def test_fake_broker_publish_batch_with_activate_in_skips_sync_dispatch() -> None:
async def test_fake_broker_publish_batch_with_activate_in_dispatches_immediately() -> None:
broker = _make_broker()
received: list[str] = []

@broker.subscriber("orders")
async def handle(body: str) -> None:
received.append(body) # pragma: no cover
received.append(body)

test_broker = TestOutboxBroker(broker)
async with test_broker:
Expand All @@ -220,25 +220,25 @@ async def handle(body: str) -> None:
activate_in=_dt.timedelta(seconds=5),
)

assert received == []
assert len(test_broker.fake_client.rows) == 2
assert received == ["a", "b"]
assert test_broker.fake_client.rows == []


async def test_fake_broker_publish_batch_with_activate_at_skips_sync_dispatch() -> None:
async def test_fake_broker_publish_batch_with_activate_at_dispatches_immediately() -> None:
broker = _make_broker()
received: list[str] = []

@broker.subscriber("orders")
async def handle(body: str) -> None:
received.append(body) # pragma: no cover
received.append(body)

test_broker = TestOutboxBroker(broker)
async with test_broker:
future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(seconds=5)
await broker.publish_batch("a", "b", queue="orders", activate_at=future) # ty: ignore[missing-argument]

assert received == []
assert len(test_broker.fake_client.rows) == 2
assert received == ["a", "b"]
assert test_broker.fake_client.rows == []


async def test_fake_broker_publish_rejects_both_activate_in_and_activate_at() -> None:
Expand Down
Loading