From 685ed90f9df1e5554d2f81204ffb60d888de76f1 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Thu, 21 May 2026 11:15:30 +0300 Subject: [PATCH] feat: fire timers immediately in TestOutboxBroker sync mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sync dispatch now ignores activate_in / activate_at — handlers run before publish returns regardless of next_attempt_at. Trades production parity for test ergonomics; run_loops=True still honors scheduling for tests that need it. Adds a Testing section to README documenting both modes. Co-Authored-By: Claude Opus 4.7 (1M context) --- CLAUDE.md | 2 +- README.md | 31 +++++++++++++++++++++++++++++++ faststream_outbox/testing.py | 22 +++++++++++----------- tests/test_fake.py | 34 +++++++++++++++++----------------- 4 files changed, 60 insertions(+), 29 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 38a5466..7fdac4f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -62,7 +62,7 @@ Every terminal write (`delete_with_lease`, `mark_pending_with_lease`) filters on `TestOutboxBroker` (in `testing.py`) swaps in a `FakeOutboxClient` (in-memory list of `_FakeRow` dicts). Two dispatch modes: -- **Sync (default, `run_loops=False`)**: `broker.publish` synchronously routes through `OutboxSubscriber.dispatch_one` — matches the FastStream test-broker idiom (`TestKafkaBroker` / `TestRabbitBroker`). The handler runs before `publish` returns; no background loops. `broker.publish_batch`, `cancel_timer`, and `fetch_unprocessed` are also patched to operate on the fake client (the `session` argument is ignored). Future-dated rows (`activate_in`/`activate_at`) intentionally are *not* dispatched — they sit in the fake client, matching production where they wait for the gate. +- **Sync (default, `run_loops=False`)**: `broker.publish` synchronously routes through `OutboxSubscriber.dispatch_one` — matches the FastStream test-broker idiom (`TestKafkaBroker` / `TestRabbitBroker`). The handler runs before `publish` returns; no background loops. `broker.publish_batch`, `cancel_timer`, and `fetch_unprocessed` are also patched to operate on the fake client (the `session` argument is ignored). Future-dated rows (`activate_in`/`activate_at`) fire **immediately** in sync mode — sync dispatch ignores `next_attempt_at`. This trades production parity for test ergonomics: tests can assert handler effects without time travel. `next_attempt_at` is still recorded on the fake row for inspection. Use `run_loops=True` if you need scheduled delivery to actually wait. - **Loop (`run_loops=True`)**: spins up the real `_fetch_loop` / `_worker_loop` against the fake client. Required for tests that exercise retry rescheduling, lease-expiry reclaim, fetch-loop error recovery, or scheduled delivery firing. Subscribers without registered handlers are skipped in `_fake_start` (mirrors `OutboxSubscriber.start`'s `if not self.calls: return`). `OutboxSubscriber.dispatch_one(row)` is the public per-row dispatch entry point. The worker loop calls it; the test broker calls it directly. Caller must have already acquired the row's lease. diff --git a/README.md b/README.md index bfaad78..7bef2d2 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,37 @@ await broker.cancel_timer(queue="orders", timer_id="order-confirm-42", session=s **Latency floor:** firing latency is bounded by the subscriber's `max_fetch_interval` (default 10s) after `next_attempt_at` elapses. Lower it for sub-10s precision; sub-second precision is not a goal of this broker. +*In tests using `TestOutboxBroker` (default sync mode), `activate_in` / `activate_at` are ignored and timers fire immediately — see [Testing](#testing).* + +## Testing + +`TestOutboxBroker` (in `faststream_outbox.testing`) swaps the SQLAlchemy-backed client for an in-memory fake so unit tests don't need Postgres. By default it dispatches handlers **synchronously inside `publish`** — matching `TestKafkaBroker` / `TestRabbitBroker`. No `_wait_until`, no `sleep`. + +```python +from faststream_outbox.testing import TestOutboxBroker + +async def test_handler() -> None: + received: list[int] = [] + + @broker.subscriber("orders") + async def handle(order_id: int) -> None: + received.append(order_id) + + async with TestOutboxBroker(broker): + await broker.publish(1, queue="orders") + # Handler has already run. + assert received == [1] +``` + +Sync mode ignores `activate_in` / `activate_at` — **timers fire immediately**, so straight-line tests work for scheduled publishes without waiting on wall clock. The schedule is still recorded on the fake row (`broker.fake_client.rows[0].next_attempt_at`) if a test needs to assert on it. `cancel_timer` still works for queues without a registered handler. + +For tests that need real polling semantics — retry rescheduling, lease expiry / reclaim, `_fetch_loop` error recovery, or honoring `activate_in` delays — opt in to the loop-driven mode: + +```python +async with TestOutboxBroker(broker, run_loops=True): + ... # use feed() / _wait_until to drive the real loops +``` + ## Schema validation Schema validation is opt-in: diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py index ce88b76..7603c1b 100644 --- a/faststream_outbox/testing.py +++ b/faststream_outbox/testing.py @@ -4,10 +4,12 @@ ``TestOutboxBroker`` wraps an ``OutboxBroker`` and swaps in a ``FakeOutboxClient`` backed by a list of dicts. Defaults to **sync dispatch**: ``await broker.publish(...)`` finds the matching subscriber and awaits its consume pipeline before returning, the -same model as ``TestKafkaBroker`` / ``TestRabbitBroker``. Pass ``run_loops=True`` to -restore the loop-driven behavior — the real ``_fetch_loop`` / ``_worker_loop`` run -against the in-memory client; required for tests that exercise retry rescheduling, -lease expiry, or fetch-loop error recovery. ``feed()`` simulates a row insert. +same model as ``TestKafkaBroker`` / ``TestRabbitBroker``. Timers fire immediately in +sync mode — ``activate_in`` / ``activate_at`` are recorded on the fake row but not +honored. Pass ``run_loops=True`` to restore the loop-driven behavior — the real +``_fetch_loop`` / ``_worker_loop`` run against the in-memory client; required for +tests that exercise retry rescheduling, lease expiry, fetch-loop error recovery, +or scheduled delivery actually waiting. ``feed()`` simulates a row insert. """ import datetime as _dt @@ -264,11 +266,10 @@ async def fake_publish( # noqa: PLR0913 next_attempt_at=next_at, timer_id=timer_id, ) - # Sync dispatch only when: - # - loop mode is off (loops would re-dispatch the row otherwise), - # - the insert wasn't a timer-dedup no-op, - # - the row is eligible now (future-dated rows wait for their gate, same as production). - if not run_loops and row_id is not None and (next_at is None or next_at <= _utcnow()): + # Sync dispatch ignores next_attempt_at — timers fire immediately in test mode. + # Skip only when loop mode is on (loops would re-dispatch) or the insert was a + # timer-dedup no-op. + if not run_loops and row_id is not None: await _sync_dispatch(fake_client, broker, queue, row_id) return row_id @@ -301,7 +302,6 @@ async def fake_publish_batch( next_at = _utcnow() + activate_in elif activate_at is not None: next_at = activate_at - eligible_now = next_at is None or next_at <= _utcnow() for body in bodies: payload, hdrs = _encode_payload(body, headers=headers, serializer=serializer) row_id = fake_client.feed( @@ -310,7 +310,7 @@ async def fake_publish_batch( headers=hdrs, next_attempt_at=next_at, ) - if not run_loops and row_id is not None and eligible_now: + if not run_loops and row_id is not None: await _sync_dispatch(fake_client, broker, queue, row_id) return fake_publish_batch diff --git a/tests/test_fake.py b/tests/test_fake.py index 76aaef8..259ba5e 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -165,14 +165,14 @@ async def test_fake_broker_publish_with_timer_id_dedups() -> None: assert len(test_broker.fake_client.rows) == 1 -async def test_fake_broker_publish_with_activate_in_skips_sync_dispatch() -> None: - """Future-dated rows are not dispatched in sync mode — they wait for the gate (production parity).""" +async def test_fake_broker_publish_with_activate_in_dispatches_immediately() -> None: + """Sync mode ignores activate_in — timers fire immediately.""" broker = _make_broker() received: list[str] = [] @broker.subscriber("orders") async def handle(body: str) -> None: - received.append(body) # pragma: no cover # handler must not fire for future-dated rows + received.append(body) test_broker = TestOutboxBroker(broker) async with test_broker: @@ -182,34 +182,34 @@ async def handle(body: str) -> None: activate_in=_dt.timedelta(seconds=5), ) - assert received == [] - assert len(test_broker.fake_client.rows) == 1 + assert received == ["delayed"] + assert test_broker.fake_client.rows == [] -async def test_fake_broker_publish_with_activate_at_skips_sync_dispatch() -> None: +async def test_fake_broker_publish_with_activate_at_dispatches_immediately() -> None: broker = _make_broker() received: list[str] = [] @broker.subscriber("orders") async def handle(body: str) -> None: - received.append(body) # pragma: no cover + received.append(body) test_broker = TestOutboxBroker(broker) async with test_broker: future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(seconds=5) await broker.publish("at-future", queue="orders", activate_at=future) # ty: ignore[missing-argument] - assert received == [] - assert len(test_broker.fake_client.rows) == 1 + assert received == ["at-future"] + assert test_broker.fake_client.rows == [] -async def test_fake_broker_publish_batch_with_activate_in_skips_sync_dispatch() -> None: +async def test_fake_broker_publish_batch_with_activate_in_dispatches_immediately() -> None: broker = _make_broker() received: list[str] = [] @broker.subscriber("orders") async def handle(body: str) -> None: - received.append(body) # pragma: no cover + received.append(body) test_broker = TestOutboxBroker(broker) async with test_broker: @@ -220,25 +220,25 @@ async def handle(body: str) -> None: activate_in=_dt.timedelta(seconds=5), ) - assert received == [] - assert len(test_broker.fake_client.rows) == 2 + assert received == ["a", "b"] + assert test_broker.fake_client.rows == [] -async def test_fake_broker_publish_batch_with_activate_at_skips_sync_dispatch() -> None: +async def test_fake_broker_publish_batch_with_activate_at_dispatches_immediately() -> None: broker = _make_broker() received: list[str] = [] @broker.subscriber("orders") async def handle(body: str) -> None: - received.append(body) # pragma: no cover + received.append(body) test_broker = TestOutboxBroker(broker) async with test_broker: future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(seconds=5) await broker.publish_batch("a", "b", queue="orders", activate_at=future) # ty: ignore[missing-argument] - assert received == [] - assert len(test_broker.fake_client.rows) == 2 + assert received == ["a", "b"] + assert test_broker.fake_client.rows == [] async def test_fake_broker_publish_rejects_both_activate_in_and_activate_at() -> None: