Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ Every terminal write (`delete_with_lease`, `mark_pending_with_lease`) filters on

### Test broker

`TestOutboxBroker` (in `testing.py`) swaps in a `FakeOutboxClient` (in-memory list of `_FakeRow` dicts) but runs the **real** `OutboxSubscriber` loops — fetch / worker — so tests exercise the actual delivery path. Subscribers without registered handlers are skipped in `_fake_start` (mirrors `OutboxSubscriber.start`'s `if not self.calls: return`).
`TestOutboxBroker` (in `testing.py`) swaps in a `FakeOutboxClient` (in-memory list of `_FakeRow` dicts). Two dispatch modes:

- **Sync (default, `run_loops=False`)**: `broker.publish` synchronously routes through `OutboxSubscriber.dispatch_one` — matches the FastStream test-broker idiom (`TestKafkaBroker` / `TestRabbitBroker`). The handler runs before `publish` returns; no background loops. `broker.publish_batch`, `cancel_timer`, and `fetch_unprocessed` are also patched to operate on the fake client (the `session` argument is ignored). Future-dated rows (`activate_in`/`activate_at`) intentionally are *not* dispatched — they sit in the fake client, matching production where they wait for the gate.
- **Loop (`run_loops=True`)**: spins up the real `_fetch_loop` / `_worker_loop` against the fake client. Required for tests that exercise retry rescheduling, lease-expiry reclaim, fetch-loop error recovery, or scheduled delivery firing. Subscribers without registered handlers are skipped in `_fake_start` (mirrors `OutboxSubscriber.start`'s `if not self.calls: return`).

`OutboxSubscriber.dispatch_one(row)` is the public per-row dispatch entry point. The worker loop calls it; the test broker calls it directly. Caller must have already acquired the row's lease.

### Engine ownership

Expand Down
44 changes: 28 additions & 16 deletions faststream_outbox/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async def _fetch_loop(self) -> None:
# cleanly (rather than raising RuntimeError) prevents FastStream's supervisor
# from restarting the task and leaking a pending coroutine at GC time.
client = self._outer_config.client
if client is None:
if client is None: # pragma: no cover # defensive teardown race; hard to deterministically hit
return
engine = client.engine
try:
Expand Down Expand Up @@ -262,27 +262,39 @@ def _on_notify(self, *_args: object) -> None:
self._notify_event.set()

async def _worker_loop(self) -> None:
logger = self._outer_config.logger.logger.logger if self._outer_config.logger else None
while self.running:
row = await self._inflight.get()
try:
row.retry_strategy = self._config.retry_strategy
if not row.allow_delivery(max_deliveries=self._config.max_deliveries, logger=logger):
await self._flush_terminal(row)
continue
# AckPolicy middleware catches handler exceptions; _CaptureExceptionMiddleware
# stashes exc onto row.last_exception before nack runs, so retry strategies
# can branch on exception type.
try:
await self.consume(row)
finally:
await row.assert_state_set(logger)
await self._flush_result(row)
except Exception as e: # noqa: BLE001
self._log(log_level=logging.ERROR, message=f"Outbox worker error: {e!r}", exc_info=e)
await self.dispatch_one(row)
finally:
self._inflight.task_done()

async def dispatch_one(self, row: OutboxInnerMessage) -> None:
"""
Run a single already-leased row through the full consume pipeline.

Mirrors the per-row body of ``_worker_loop`` so ``TestOutboxBroker`` can drive
the handler synchronously from ``broker.publish``, matching the FastStream
test-broker idiom (``TestKafkaBroker`` / ``TestRabbitBroker``). The caller is
responsible for having acquired the row's lease before invoking this.
"""
logger = self._outer_config.logger.logger.logger if self._outer_config.logger else None
try:
row.retry_strategy = self._config.retry_strategy
if not row.allow_delivery(max_deliveries=self._config.max_deliveries, logger=logger):
await self._flush_terminal(row)
return
# AckPolicy middleware catches handler exceptions; _CaptureExceptionMiddleware
# stashes exc onto row.last_exception before nack runs, so retry strategies
# can branch on exception type.
try:
await self.consume(row)
finally:
await row.assert_state_set(logger)
await self._flush_result(row)
except Exception as e: # noqa: BLE001
self._log(log_level=logging.ERROR, message=f"Outbox worker error: {e!r}", exc_info=e)

async def _flush_result(self, row: OutboxInnerMessage) -> None:
if row.to_delete:
await self._flush_terminal(row)
Expand Down
103 changes: 91 additions & 12 deletions faststream_outbox/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
Test broker with an in-memory ``OutboxClient`` substitute.

``TestOutboxBroker`` wraps an ``OutboxBroker`` and swaps in a ``FakeOutboxClient``
backed by a list of dicts. The real ``OutboxSubscriber`` runs unmodified — same
fetch / worker loops — so tests exercise the actual delivery path, not a shortcut.
``feed()`` simulates a row insert.
backed by a list of dicts. Defaults to **sync dispatch**: ``await broker.publish(...)``
finds the matching subscriber and awaits its consume pipeline before returning, the
same model as ``TestKafkaBroker`` / ``TestRabbitBroker``. Pass ``run_loops=True`` to
restore the loop-driven behavior — the real ``_fetch_loop`` / ``_worker_loop`` run
against the in-memory client; required for tests that exercise retry rescheduling,
lease expiry, or fetch-loop error recovery. ``feed()`` simulates a row insert.
"""

import datetime as _dt
Expand Down Expand Up @@ -194,9 +197,38 @@ def _to_inner(row: _FakeRow) -> OutboxInnerMessage:
)


def _find_subscriber_for_queue(broker: OutboxBroker, queue: str) -> "OutboxSubscriber | None":
"""First matching subscriber wins — mirrors production fetch behavior for overlapping subscribers."""
for raw_subscriber in broker.subscribers:
sub = typing.cast("OutboxSubscriber", raw_subscriber)
if not sub.calls:
continue
if queue in sub._config.queues: # noqa: SLF001
return sub
return None


async def _sync_dispatch(fake_client: FakeOutboxClient, broker: OutboxBroker, queue: str, row_id: int) -> None:
"""Acquire the just-fed row's lease in place and run it through the subscriber pipeline."""
subscriber = _find_subscriber_for_queue(broker, queue)
if subscriber is None:
# No handler for this queue — leave the row in the fake client for inspection.
return
fake_row = next((r for r in fake_client.rows if r.id == row_id), None)
if fake_row is None: # pragma: no cover # defensive: feed just returned this id
return
fake_row.acquired_token = uuid.uuid4()
fake_row.acquired_at = _utcnow()
fake_row.deliveries_count += 1
await subscriber.dispatch_one(_to_inner(fake_row))


def _build_fake_publish(
fake_client: FakeOutboxClient,
broker: OutboxBroker,
serializer: typing.Any,
*,
run_loops: bool,
) -> typing.Callable[..., typing.Awaitable[int | None]]:
async def fake_publish( # noqa: PLR0913
body: typing.Any,
Expand Down Expand Up @@ -225,20 +257,30 @@ async def fake_publish( # noqa: PLR0913
next_at = _utcnow() + activate_in
elif activate_at is not None:
next_at = activate_at
return fake_client.feed(
row_id = fake_client.feed(
queue=queue,
payload=payload,
headers=hdrs,
next_attempt_at=next_at,
timer_id=timer_id,
)
# Sync dispatch only when:
# - loop mode is off (loops would re-dispatch the row otherwise),
# - the insert wasn't a timer-dedup no-op,
# - the row is eligible now (future-dated rows wait for their gate, same as production).
if not run_loops and row_id is not None and (next_at is None or next_at <= _utcnow()):
await _sync_dispatch(fake_client, broker, queue, row_id)
return row_id

return fake_publish


def _build_fake_publish_batch(
fake_client: FakeOutboxClient,
broker: OutboxBroker,
serializer: typing.Any,
*,
run_loops: bool,
) -> typing.Callable[..., typing.Awaitable[None]]:
async def fake_publish_batch(
*bodies: typing.Any,
Expand All @@ -259,14 +301,17 @@ async def fake_publish_batch(
next_at = _utcnow() + activate_in
elif activate_at is not None:
next_at = activate_at
eligible_now = next_at is None or next_at <= _utcnow()
for body in bodies:
payload, hdrs = _encode_payload(body, headers=headers, serializer=serializer)
fake_client.feed(
row_id = fake_client.feed(
queue=queue,
payload=payload,
headers=hdrs,
next_attempt_at=next_at,
)
if not run_loops and row_id is not None and eligible_now:
await _sync_dispatch(fake_client, broker, queue, row_id)

return fake_publish_batch

Expand All @@ -286,14 +331,45 @@ async def fake_cancel_timer(
return fake_cancel_timer


def _build_fake_fetch_unprocessed(
fake_client: FakeOutboxClient,
) -> typing.Callable[..., typing.Awaitable[list[OutboxInnerMessage]]]:
async def fake_fetch_unprocessed(
*,
session: typing.Any = None,
queue: str | None = None,
) -> list[OutboxInnerMessage]:
del session
rows = sorted(fake_client.rows, key=lambda r: r.id)
if queue is not None:
rows = [r for r in rows if r.queue == queue]
return [_to_inner(r) for r in rows]

return fake_fetch_unprocessed


class TestOutboxBroker(TestBroker[OutboxBroker]): # ty: ignore[invalid-type-arguments]
"""Test harness that runs the real subscriber loops against an in-memory client."""
"""
Test harness for ``OutboxBroker``. Two dispatch modes.

Default (``run_loops=False``): ``broker.publish`` synchronously drives the matching
subscriber's consume pipeline, so handlers run before ``publish`` returns. Matches the
FastStream test-broker idiom — ``TestKafkaBroker`` / ``TestRabbitBroker`` behave the
same way. Future-dated rows (``activate_in`` / ``activate_at``) stay in the fake
client and are *not* dispatched, mirroring production where they wait for the gate.

Pass ``run_loops=True`` to spin up the real ``_fetch_loop`` / ``_worker_loop`` against
the in-memory client. Required for tests that exercise loop-driven behavior:
retry rescheduling, lease expiry reclaim, or fetch-loop error recovery.
"""

fake_client: FakeOutboxClient
run_loops: bool

def __init__(self, broker: OutboxBroker, **kwargs: typing.Any) -> None:
def __init__(self, broker: OutboxBroker, *, run_loops: bool = False, **kwargs: typing.Any) -> None:
super().__init__(broker, **kwargs)
self.fake_client = FakeOutboxClient()
self.run_loops = run_loops

def feed(
self,
Expand Down Expand Up @@ -326,26 +402,29 @@ def _patch_broker(self, broker: OutboxBroker) -> "Iterator[None]":
# Mirror real publish's serializer wiring so pydantic / dataclass bodies
# encode identically in tests.
serializer = broker.config.broker_config.fd_config._serializer # noqa: SLF001
fake_publish = _build_fake_publish(self.fake_client, serializer)
fake_publish_batch = _build_fake_publish_batch(self.fake_client, serializer)
fake_publish = _build_fake_publish(self.fake_client, broker, serializer, run_loops=self.run_loops)
fake_publish_batch = _build_fake_publish_batch(self.fake_client, broker, serializer, run_loops=self.run_loops)
fake_cancel_timer = _build_fake_cancel_timer(self.fake_client)
fake_fetch_unprocessed = _build_fake_fetch_unprocessed(self.fake_client)
try:
with (
mock.patch.object(broker, "publish", new=fake_publish),
mock.patch.object(broker, "publish_batch", new=fake_publish_batch),
mock.patch.object(broker, "cancel_timer", new=fake_cancel_timer),
mock.patch.object(broker, "fetch_unprocessed", new=fake_fetch_unprocessed),
super()._patch_broker(broker),
):
yield
finally:
broker.config.broker_config.client = original_client

def _fake_start(self, broker: OutboxBroker, *args: typing.Any, **kwargs: typing.Any) -> None:

# Run the parent _fake_start (sets up publisher fakes, calls _post_start, etc.)
super()._fake_start(broker, *args, **kwargs)
# Then spin up the real subscriber loops against the in-memory fake client. Without this,
# ``feed()`` would drop rows on the floor — there's no producer to fall back to.
# In sync mode, publish drives dispatch directly — don't spawn the loops.
if not self.run_loops:
return
# Loop mode: spin up the real subscriber loops against the in-memory fake client.
# Skip subscribers without a registered handler — matches OutboxSubscriber.start()'s
# ``if not self.calls: return`` behavior so the test broker doesn't access ``_client``
# for an inert subscriber.
Expand Down
Loading
Loading