diff --git a/CLAUDE.md b/CLAUDE.md index 3acc4ed..38a5466 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -60,7 +60,12 @@ Every terminal write (`delete_with_lease`, `mark_pending_with_lease`) filters on ### Test broker -`TestOutboxBroker` (in `testing.py`) swaps in a `FakeOutboxClient` (in-memory list of `_FakeRow` dicts) but runs the **real** `OutboxSubscriber` loops — fetch / worker — so tests exercise the actual delivery path. Subscribers without registered handlers are skipped in `_fake_start` (mirrors `OutboxSubscriber.start`'s `if not self.calls: return`). +`TestOutboxBroker` (in `testing.py`) swaps in a `FakeOutboxClient` (in-memory list of `_FakeRow` dicts). Two dispatch modes: + +- **Sync (default, `run_loops=False`)**: `broker.publish` synchronously routes through `OutboxSubscriber.dispatch_one` — matches the FastStream test-broker idiom (`TestKafkaBroker` / `TestRabbitBroker`). The handler runs before `publish` returns; no background loops. `broker.publish_batch`, `cancel_timer`, and `fetch_unprocessed` are also patched to operate on the fake client (the `session` argument is ignored). Future-dated rows (`activate_in`/`activate_at`) intentionally are *not* dispatched — they sit in the fake client, matching production where they wait for the gate. +- **Loop (`run_loops=True`)**: spins up the real `_fetch_loop` / `_worker_loop` against the fake client. Required for tests that exercise retry rescheduling, lease-expiry reclaim, fetch-loop error recovery, or scheduled delivery firing. Subscribers without registered handlers are skipped in `_fake_start` (mirrors `OutboxSubscriber.start`'s `if not self.calls: return`). + +`OutboxSubscriber.dispatch_one(row)` is the public per-row dispatch entry point. The worker loop calls it; the test broker calls it directly. Caller must have already acquired the row's lease. ### Engine ownership diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index 58fa725..082c3fa 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -154,7 +154,7 @@ async def _fetch_loop(self) -> None: # cleanly (rather than raising RuntimeError) prevents FastStream's supervisor # from restarting the task and leaking a pending coroutine at GC time. client = self._outer_config.client - if client is None: + if client is None: # pragma: no cover # defensive teardown race; hard to deterministically hit return engine = client.engine try: @@ -262,27 +262,39 @@ def _on_notify(self, *_args: object) -> None: self._notify_event.set() async def _worker_loop(self) -> None: - logger = self._outer_config.logger.logger.logger if self._outer_config.logger else None while self.running: row = await self._inflight.get() try: - row.retry_strategy = self._config.retry_strategy - if not row.allow_delivery(max_deliveries=self._config.max_deliveries, logger=logger): - await self._flush_terminal(row) - continue - # AckPolicy middleware catches handler exceptions; _CaptureExceptionMiddleware - # stashes exc onto row.last_exception before nack runs, so retry strategies - # can branch on exception type. - try: - await self.consume(row) - finally: - await row.assert_state_set(logger) - await self._flush_result(row) - except Exception as e: # noqa: BLE001 - self._log(log_level=logging.ERROR, message=f"Outbox worker error: {e!r}", exc_info=e) + await self.dispatch_one(row) finally: self._inflight.task_done() + async def dispatch_one(self, row: OutboxInnerMessage) -> None: + """ + Run a single already-leased row through the full consume pipeline. + + Mirrors the per-row body of ``_worker_loop`` so ``TestOutboxBroker`` can drive + the handler synchronously from ``broker.publish``, matching the FastStream + test-broker idiom (``TestKafkaBroker`` / ``TestRabbitBroker``). The caller is + responsible for having acquired the row's lease before invoking this. + """ + logger = self._outer_config.logger.logger.logger if self._outer_config.logger else None + try: + row.retry_strategy = self._config.retry_strategy + if not row.allow_delivery(max_deliveries=self._config.max_deliveries, logger=logger): + await self._flush_terminal(row) + return + # AckPolicy middleware catches handler exceptions; _CaptureExceptionMiddleware + # stashes exc onto row.last_exception before nack runs, so retry strategies + # can branch on exception type. + try: + await self.consume(row) + finally: + await row.assert_state_set(logger) + await self._flush_result(row) + except Exception as e: # noqa: BLE001 + self._log(log_level=logging.ERROR, message=f"Outbox worker error: {e!r}", exc_info=e) + async def _flush_result(self, row: OutboxInnerMessage) -> None: if row.to_delete: await self._flush_terminal(row) diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py index 924fc70..ce88b76 100644 --- a/faststream_outbox/testing.py +++ b/faststream_outbox/testing.py @@ -2,9 +2,12 @@ Test broker with an in-memory ``OutboxClient`` substitute. ``TestOutboxBroker`` wraps an ``OutboxBroker`` and swaps in a ``FakeOutboxClient`` -backed by a list of dicts. The real ``OutboxSubscriber`` runs unmodified — same -fetch / worker loops — so tests exercise the actual delivery path, not a shortcut. -``feed()`` simulates a row insert. +backed by a list of dicts. Defaults to **sync dispatch**: ``await broker.publish(...)`` +finds the matching subscriber and awaits its consume pipeline before returning, the +same model as ``TestKafkaBroker`` / ``TestRabbitBroker``. Pass ``run_loops=True`` to +restore the loop-driven behavior — the real ``_fetch_loop`` / ``_worker_loop`` run +against the in-memory client; required for tests that exercise retry rescheduling, +lease expiry, or fetch-loop error recovery. ``feed()`` simulates a row insert. """ import datetime as _dt @@ -194,9 +197,38 @@ def _to_inner(row: _FakeRow) -> OutboxInnerMessage: ) +def _find_subscriber_for_queue(broker: OutboxBroker, queue: str) -> "OutboxSubscriber | None": + """First matching subscriber wins — mirrors production fetch behavior for overlapping subscribers.""" + for raw_subscriber in broker.subscribers: + sub = typing.cast("OutboxSubscriber", raw_subscriber) + if not sub.calls: + continue + if queue in sub._config.queues: # noqa: SLF001 + return sub + return None + + +async def _sync_dispatch(fake_client: FakeOutboxClient, broker: OutboxBroker, queue: str, row_id: int) -> None: + """Acquire the just-fed row's lease in place and run it through the subscriber pipeline.""" + subscriber = _find_subscriber_for_queue(broker, queue) + if subscriber is None: + # No handler for this queue — leave the row in the fake client for inspection. + return + fake_row = next((r for r in fake_client.rows if r.id == row_id), None) + if fake_row is None: # pragma: no cover # defensive: feed just returned this id + return + fake_row.acquired_token = uuid.uuid4() + fake_row.acquired_at = _utcnow() + fake_row.deliveries_count += 1 + await subscriber.dispatch_one(_to_inner(fake_row)) + + def _build_fake_publish( fake_client: FakeOutboxClient, + broker: OutboxBroker, serializer: typing.Any, + *, + run_loops: bool, ) -> typing.Callable[..., typing.Awaitable[int | None]]: async def fake_publish( # noqa: PLR0913 body: typing.Any, @@ -225,20 +257,30 @@ async def fake_publish( # noqa: PLR0913 next_at = _utcnow() + activate_in elif activate_at is not None: next_at = activate_at - return fake_client.feed( + row_id = fake_client.feed( queue=queue, payload=payload, headers=hdrs, next_attempt_at=next_at, timer_id=timer_id, ) + # Sync dispatch only when: + # - loop mode is off (loops would re-dispatch the row otherwise), + # - the insert wasn't a timer-dedup no-op, + # - the row is eligible now (future-dated rows wait for their gate, same as production). + if not run_loops and row_id is not None and (next_at is None or next_at <= _utcnow()): + await _sync_dispatch(fake_client, broker, queue, row_id) + return row_id return fake_publish def _build_fake_publish_batch( fake_client: FakeOutboxClient, + broker: OutboxBroker, serializer: typing.Any, + *, + run_loops: bool, ) -> typing.Callable[..., typing.Awaitable[None]]: async def fake_publish_batch( *bodies: typing.Any, @@ -259,14 +301,17 @@ async def fake_publish_batch( next_at = _utcnow() + activate_in elif activate_at is not None: next_at = activate_at + eligible_now = next_at is None or next_at <= _utcnow() for body in bodies: payload, hdrs = _encode_payload(body, headers=headers, serializer=serializer) - fake_client.feed( + row_id = fake_client.feed( queue=queue, payload=payload, headers=hdrs, next_attempt_at=next_at, ) + if not run_loops and row_id is not None and eligible_now: + await _sync_dispatch(fake_client, broker, queue, row_id) return fake_publish_batch @@ -286,14 +331,45 @@ async def fake_cancel_timer( return fake_cancel_timer +def _build_fake_fetch_unprocessed( + fake_client: FakeOutboxClient, +) -> typing.Callable[..., typing.Awaitable[list[OutboxInnerMessage]]]: + async def fake_fetch_unprocessed( + *, + session: typing.Any = None, + queue: str | None = None, + ) -> list[OutboxInnerMessage]: + del session + rows = sorted(fake_client.rows, key=lambda r: r.id) + if queue is not None: + rows = [r for r in rows if r.queue == queue] + return [_to_inner(r) for r in rows] + + return fake_fetch_unprocessed + + class TestOutboxBroker(TestBroker[OutboxBroker]): # ty: ignore[invalid-type-arguments] - """Test harness that runs the real subscriber loops against an in-memory client.""" + """ + Test harness for ``OutboxBroker``. Two dispatch modes. + + Default (``run_loops=False``): ``broker.publish`` synchronously drives the matching + subscriber's consume pipeline, so handlers run before ``publish`` returns. Matches the + FastStream test-broker idiom — ``TestKafkaBroker`` / ``TestRabbitBroker`` behave the + same way. Future-dated rows (``activate_in`` / ``activate_at``) stay in the fake + client and are *not* dispatched, mirroring production where they wait for the gate. + + Pass ``run_loops=True`` to spin up the real ``_fetch_loop`` / ``_worker_loop`` against + the in-memory client. Required for tests that exercise loop-driven behavior: + retry rescheduling, lease expiry reclaim, or fetch-loop error recovery. + """ fake_client: FakeOutboxClient + run_loops: bool - def __init__(self, broker: OutboxBroker, **kwargs: typing.Any) -> None: + def __init__(self, broker: OutboxBroker, *, run_loops: bool = False, **kwargs: typing.Any) -> None: super().__init__(broker, **kwargs) self.fake_client = FakeOutboxClient() + self.run_loops = run_loops def feed( self, @@ -326,14 +402,16 @@ def _patch_broker(self, broker: OutboxBroker) -> "Iterator[None]": # Mirror real publish's serializer wiring so pydantic / dataclass bodies # encode identically in tests. serializer = broker.config.broker_config.fd_config._serializer # noqa: SLF001 - fake_publish = _build_fake_publish(self.fake_client, serializer) - fake_publish_batch = _build_fake_publish_batch(self.fake_client, serializer) + fake_publish = _build_fake_publish(self.fake_client, broker, serializer, run_loops=self.run_loops) + fake_publish_batch = _build_fake_publish_batch(self.fake_client, broker, serializer, run_loops=self.run_loops) fake_cancel_timer = _build_fake_cancel_timer(self.fake_client) + fake_fetch_unprocessed = _build_fake_fetch_unprocessed(self.fake_client) try: with ( mock.patch.object(broker, "publish", new=fake_publish), mock.patch.object(broker, "publish_batch", new=fake_publish_batch), mock.patch.object(broker, "cancel_timer", new=fake_cancel_timer), + mock.patch.object(broker, "fetch_unprocessed", new=fake_fetch_unprocessed), super()._patch_broker(broker), ): yield @@ -341,11 +419,12 @@ def _patch_broker(self, broker: OutboxBroker) -> "Iterator[None]": broker.config.broker_config.client = original_client def _fake_start(self, broker: OutboxBroker, *args: typing.Any, **kwargs: typing.Any) -> None: - # Run the parent _fake_start (sets up publisher fakes, calls _post_start, etc.) super()._fake_start(broker, *args, **kwargs) - # Then spin up the real subscriber loops against the in-memory fake client. Without this, - # ``feed()`` would drop rows on the floor — there's no producer to fall back to. + # In sync mode, publish drives dispatch directly — don't spawn the loops. + if not self.run_loops: + return + # Loop mode: spin up the real subscriber loops against the in-memory fake client. # Skip subscribers without a registered handler — matches OutboxSubscriber.start()'s # ``if not self.calls: return`` behavior so the test broker doesn't access ``_client`` # for an inert subscriber. diff --git a/tests/test_fake.py b/tests/test_fake.py index dd7b19d..76aaef8 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -25,6 +25,7 @@ def _make_broker() -> OutboxBroker: async def _wait_until(predicate: Callable[[], object], *, timeout: float = 2.0) -> None: # noqa: ASYNC109 + # Used by run_loops=True tests; sync-mode tests assert directly after publish. deadline = asyncio.get_event_loop().time() + timeout while asyncio.get_event_loop().time() < deadline: if predicate(): @@ -34,56 +35,69 @@ async def _wait_until(predicate: Callable[[], object], *, timeout: float = 2.0) raise AssertionError(msg) # pragma: no cover -async def test_fake_broker_delivers_to_handler() -> None: +# --- Sync-mode tests (default TestOutboxBroker) --------------------------------------- + + +async def test_fake_broker_publish_triggers_handler() -> None: + """``broker.publish`` synchronously dispatches the handler, FastStream-test-broker style.""" broker = _make_broker() received: list[dict] = [] - @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + @broker.subscriber("orders") async def handle(body: dict) -> None: received.append(body) test_broker = TestOutboxBroker(broker) async with test_broker: - payload, headers = encode_payload({"order_id": 1}) - test_broker.feed("orders", payload, headers=headers) - await _wait_until(lambda: len(received) == 1) + await broker.publish({"order_id": 1}, queue="orders") # ty: ignore[missing-argument] assert received == [{"order_id": 1}] assert test_broker.fake_client.rows == [] # row deleted after ack +async def test_fake_broker_publish_batch_triggers_handler() -> None: + broker = _make_broker() + received: list[str] = [] + + @broker.subscriber("orders") + async def handle(body: str) -> None: + received.append(body) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + await broker.publish_batch("a", "b", "c", queue="orders") # ty: ignore[missing-argument] + + assert received == ["a", "b", "c"] + assert test_broker.fake_client.rows == [] + + async def test_fake_broker_multi_queue_subscriber() -> None: broker = _make_broker() seen: list[str] = [] - @broker.subscriber(["orders", "shipments"], min_fetch_interval=0.01, max_fetch_interval=0.05) + @broker.subscriber(["orders", "shipments"]) async def handle(body: str) -> None: seen.append(body) test_broker = TestOutboxBroker(broker) async with test_broker: - p1, h1 = encode_payload("o-1") - p2, h2 = encode_payload("s-1") - test_broker.feed("orders", p1, headers=h1) - test_broker.feed("shipments", p2, headers=h2) - await _wait_until(lambda: len(seen) == 2) + await broker.publish("o-1", queue="orders") # ty: ignore[missing-argument] + await broker.publish("s-1", queue="shipments") # ty: ignore[missing-argument] - assert sorted(seen) == ["o-1", "s-1"] + assert seen == ["o-1", "s-1"] -async def test_fake_broker_ignores_other_queues() -> None: +async def test_fake_broker_publish_to_unhandled_queue_leaves_row() -> None: + """Publishing to a queue with no matching subscriber leaves the row in the fake client.""" broker = _make_broker() - @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + @broker.subscriber("orders") async def handle(body: str) -> None: ... test_broker = TestOutboxBroker(broker) async with test_broker: - p, h = encode_payload("not-mine") - test_broker.feed("other-queue", p, headers=h) - await asyncio.sleep(0.2) + await broker.publish("not-mine", queue="other-queue") # ty: ignore[missing-argument] - # Row stays in fake client because no subscriber matches that queue assert len(test_broker.fake_client.rows) == 1 assert test_broker.fake_client.rows[0].queue == "other-queue" @@ -91,7 +105,7 @@ async def handle(body: str) -> None: ... async def test_fake_broker_failing_handler_with_no_retry_deletes_row() -> None: broker = _make_broker() - @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + @broker.subscriber("orders") async def handle(body: str) -> None: del body msg = "boom" @@ -99,54 +113,9 @@ async def handle(body: str) -> None: test_broker = TestOutboxBroker(broker) async with test_broker: - p, h = encode_payload("x") - test_broker.feed("orders", p, headers=h) - await _wait_until(lambda: not test_broker.fake_client.rows, timeout=2.0) - - -async def test_fake_broker_failing_handler_with_retry_reschedules() -> None: - broker = _make_broker() - attempts: list[str] = [] - - @broker.subscriber( - "orders", - min_fetch_interval=0.01, - max_fetch_interval=0.05, - retry_strategy=ConstantRetry(delay_seconds=0.05, max_attempts=3), - ) - async def handle(body: str) -> None: - attempts.append(body) - if len(attempts) < 3: - msg = "transient" - raise RuntimeError(msg) - - test_broker = TestOutboxBroker(broker) - async with test_broker: - p, h = encode_payload("retry-me") - test_broker.feed("orders", p, headers=h) - await _wait_until(lambda: len(attempts) == 3, timeout=3.0) - await _wait_until(lambda: not test_broker.fake_client.rows, timeout=1.0) - - -async def test_fake_broker_max_deliveries_drops_row() -> None: - broker = _make_broker() - - @broker.subscriber( - "orders", - min_fetch_interval=0.01, - max_fetch_interval=0.05, - max_deliveries=1, - ) - async def handle(body: str) -> None: ... + await broker.publish("x", queue="orders") # ty: ignore[missing-argument] - test_broker = TestOutboxBroker(broker) - async with test_broker: - p, h = encode_payload("never") - test_broker.feed("orders", p, headers=h) - # Pre-bump deliveries_count so allow_delivery rejects on first claim — handler - # never runs, the row is just dropped. - test_broker.fake_client.rows[0].deliveries_count = 5 - await _wait_until(lambda: not test_broker.fake_client.rows, timeout=2.0) + assert test_broker.fake_client.rows == [] async def test_fake_broker_correlation_id_in_handler_context() -> None: @@ -155,59 +124,24 @@ async def test_fake_broker_correlation_id_in_handler_context() -> None: broker = _make_broker() seen: list[str] = [] - @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + @broker.subscriber("orders") async def handle(body: dict, correlation_id: str = Context("message.correlation_id")) -> None: del body seen.append(correlation_id) test_broker = TestOutboxBroker(broker) async with test_broker: - p, h = encode_payload({"x": 1}, correlation_id="trace-xyz") - test_broker.feed("orders", p, headers=h) - await _wait_until(lambda: seen, timeout=2.0) + await broker.publish({"x": 1}, queue="orders", correlation_id="trace-xyz") # ty: ignore[missing-argument] assert seen == ["trace-xyz"] -async def test_fake_broker_expired_lease_is_reclaimed() -> None: - broker = _make_broker() - received: list[str] = [] - - @broker.subscriber( - "orders", - min_fetch_interval=0.01, - max_fetch_interval=0.05, - lease_ttl_seconds=0.1, - ) - async def handle(body: str) -> None: - received.append(body) - - test_broker = TestOutboxBroker(broker) - async with test_broker: - # Manually create a row with an expired lease — fetch must reclaim it. - old = _dt.datetime.now(tz=_dt.UTC) - _dt.timedelta(seconds=10) - test_broker.fake_client._rows.append( # noqa: SLF001 - _FakeRow( - id=99, - queue="orders", - payload=encode_payload("stuck-payload")[0], - headers=encode_payload("stuck-payload")[1], - acquired_at=old, - acquired_token=uuid.uuid4(), - ) - ) - test_broker.fake_client._next_id = 100 # noqa: SLF001 - await _wait_until(lambda: received, timeout=3.0) - - async def test_fake_broker_no_handler_no_dispatch() -> None: broker = _make_broker() test_broker = TestOutboxBroker(broker) async with test_broker: - # No subscribers registered — feeding a row should leave it in fake client - p, h = encode_payload("nope") - test_broker.feed("orders", p, headers=h) - await asyncio.sleep(0.1) + # No subscribers registered — publishing should leave the row in the fake client. + await broker.publish("nope", queue="orders") # ty: ignore[missing-argument] assert len(test_broker.fake_client.rows) == 1 @@ -219,79 +153,92 @@ async def test_fake_broker_request_raises() -> None: await broker.request(b"x") -async def test_fake_broker_publish_triggers_handler() -> None: - """ - ``broker.publish`` inside ``TestOutboxBroker`` must route to the fake client and fire the handler. +async def test_fake_broker_publish_with_timer_id_dedups() -> None: + broker = _make_broker() + # No subscriber for "timers": rows persist so we can observe dedup behavior. + test_broker = TestOutboxBroker(broker) + async with test_broker: + first = await broker.publish("x", queue="timers", timer_id="email-1") # ty: ignore[missing-argument] + second = await broker.publish("y", queue="timers", timer_id="email-1") # ty: ignore[missing-argument] + assert first is not None + assert second is None + assert len(test_broker.fake_client.rows) == 1 - This is the standard FastStream test-broker idiom — the same call site works in - tests as in production, with the session argument ignored in test mode. - """ + +async def test_fake_broker_publish_with_activate_in_skips_sync_dispatch() -> None: + """Future-dated rows are not dispatched in sync mode — they wait for the gate (production parity).""" broker = _make_broker() - received: list[dict] = [] + received: list[str] = [] - @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) - async def handle(body: dict) -> None: - received.append(body) + @broker.subscriber("orders") + async def handle(body: str) -> None: + received.append(body) # pragma: no cover # handler must not fire for future-dated rows test_broker = TestOutboxBroker(broker) async with test_broker: - await broker.publish({"order_id": 1}, queue="orders") # ty: ignore[missing-argument] - await _wait_until(lambda: received, timeout=2.0) + await broker.publish( # ty: ignore[missing-argument] + "delayed", + queue="orders", + activate_in=_dt.timedelta(seconds=5), + ) - assert received == [{"order_id": 1}] - assert test_broker.fake_client.rows == [] # row deleted after ack + assert received == [] + assert len(test_broker.fake_client.rows) == 1 -async def test_fake_broker_publish_batch_triggers_handler() -> None: +async def test_fake_broker_publish_with_activate_at_skips_sync_dispatch() -> None: broker = _make_broker() received: list[str] = [] - @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + @broker.subscriber("orders") async def handle(body: str) -> None: - received.append(body) + received.append(body) # pragma: no cover test_broker = TestOutboxBroker(broker) async with test_broker: - await broker.publish_batch("a", "b", "c", queue="orders") # ty: ignore[missing-argument] - await _wait_until(lambda: len(received) == 3, timeout=2.0) + future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(seconds=5) + await broker.publish("at-future", queue="orders", activate_at=future) # ty: ignore[missing-argument] - assert sorted(received) == ["a", "b", "c"] + assert received == [] + assert len(test_broker.fake_client.rows) == 1 -async def test_fake_broker_publish_with_timer_id_dedups() -> None: +async def test_fake_broker_publish_batch_with_activate_in_skips_sync_dispatch() -> None: broker = _make_broker() + received: list[str] = [] - @broker.subscriber("orders", min_fetch_interval=10.0, max_fetch_interval=10.0) - async def handle(body: str) -> None: ... + @broker.subscriber("orders") + async def handle(body: str) -> None: + received.append(body) # pragma: no cover test_broker = TestOutboxBroker(broker) async with test_broker: - first = await broker.publish("x", queue="orders", timer_id="email-1") # ty: ignore[missing-argument] - second = await broker.publish("y", queue="orders", timer_id="email-1") # ty: ignore[missing-argument] - assert first is not None - assert second is None - assert len(test_broker.fake_client.rows) == 1 + await broker.publish_batch( # ty: ignore[missing-argument] + "a", + "b", + queue="orders", + activate_in=_dt.timedelta(seconds=5), + ) + + assert received == [] + assert len(test_broker.fake_client.rows) == 2 -async def test_fake_broker_publish_with_activate_in_delays_delivery() -> None: +async def test_fake_broker_publish_batch_with_activate_at_skips_sync_dispatch() -> None: broker = _make_broker() received: list[str] = [] - @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + @broker.subscriber("orders") async def handle(body: str) -> None: - received.append(body) + received.append(body) # pragma: no cover test_broker = TestOutboxBroker(broker) async with test_broker: - await broker.publish( # ty: ignore[missing-argument] - "delayed", - queue="orders", - activate_in=_dt.timedelta(milliseconds=300), - ) - await asyncio.sleep(0.1) - assert received == [] - await _wait_until(lambda: received, timeout=2.0) - assert received == ["delayed"] + future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(seconds=5) + await broker.publish_batch("a", "b", queue="orders", activate_at=future) # ty: ignore[missing-argument] + + assert received == [] + assert len(test_broker.fake_client.rows) == 2 async def test_fake_broker_publish_rejects_both_activate_in_and_activate_at() -> None: @@ -307,102 +254,286 @@ async def test_fake_broker_publish_rejects_both_activate_in_and_activate_at() -> ) +async def test_fake_broker_publish_batch_rejects_both_activate_in_and_activate_at() -> None: + broker = _make_broker() + test_broker = TestOutboxBroker(broker) + async with test_broker: + with pytest.raises(ValueError, match="at most one of activate_in / activate_at"): + await broker.publish_batch( # ty: ignore[missing-argument] + "x", + queue="orders", + activate_in=_dt.timedelta(seconds=1), + activate_at=_dt.datetime.now(tz=_dt.UTC), + ) + + +async def test_fake_broker_publish_batch_empty_bodies_is_noop() -> None: + broker = _make_broker() + test_broker = TestOutboxBroker(broker) + async with test_broker: + await broker.publish_batch(queue="orders") # ty: ignore[missing-argument] + assert test_broker.fake_client.rows == [] + + async def test_fake_broker_cancel_timer_removes_row() -> None: broker = _make_broker() test_broker = TestOutboxBroker(broker) + # No handler for "timers" — row persists after publish so we can cancel it. async with test_broker: - await broker.publish("x", queue="orders", timer_id="email-1") # ty: ignore[missing-argument] + await broker.publish("x", queue="timers", timer_id="email-1") # ty: ignore[missing-argument] assert len(test_broker.fake_client.rows) == 1 - cancelled = await broker.cancel_timer(queue="orders", timer_id="email-1") # ty: ignore[missing-argument] + cancelled = await broker.cancel_timer(queue="timers", timer_id="email-1") # ty: ignore[missing-argument] assert cancelled is True assert test_broker.fake_client.rows == [] -async def test_fake_broker_publish_with_activate_at_delays_delivery() -> None: +async def test_fake_broker_fetch_unprocessed_reads_fake_client() -> None: + """``broker.fetch_unprocessed`` in test mode reads the in-memory store, not SQLAlchemy.""" broker = _make_broker() + test_broker = TestOutboxBroker(broker) + async with test_broker: + # No subscriber — rows stay in fake client. + await broker.publish("a", queue="q1") # ty: ignore[missing-argument] + await broker.publish("b", queue="q2") # ty: ignore[missing-argument] + + all_rows = await broker.fetch_unprocessed() # ty: ignore[missing-argument] + assert [r.queue for r in all_rows] == ["q1", "q2"] + + q1_only = await broker.fetch_unprocessed(queue="q1") # ty: ignore[missing-argument] + assert [r.queue for r in q1_only] == ["q1"] + + +async def test_fake_broker_router_subscriber_receives_publish() -> None: received: list[str] = [] + router = OutboxRouter() - @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + @router.subscriber("orders") async def handle(body: str) -> None: received.append(body) + broker = _make_broker() + broker.include_router(router) + test_broker = TestOutboxBroker(broker) async with test_broker: - future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(milliseconds=300) - await broker.publish("at-future", queue="orders", activate_at=future) # ty: ignore[missing-argument] - await asyncio.sleep(0.1) - assert received == [] - await _wait_until(lambda: received, timeout=2.0) - assert received == ["at-future"] + await broker.publish("via-router", queue="orders") # ty: ignore[missing-argument] + assert received == ["via-router"] + + +async def test_fake_broker_publish_invokes_flush_terminal_when_lease_lost() -> None: + """``delete_with_lease`` returning False is logged and skipped, not raised.""" + + class LeaseLostClient(FakeOutboxClient): + async def delete_with_lease(self, message_id: int, acquired_token: uuid.UUID) -> bool: # noqa: ARG002 + return False -async def test_fake_broker_publish_batch_with_activate_in_delays_delivery() -> None: broker = _make_broker() received: list[str] = [] - @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + @broker.subscriber("orders") async def handle(body: str) -> None: received.append(body) test_broker = TestOutboxBroker(broker) + test_broker.fake_client = LeaseLostClient() async with test_broker: - await broker.publish_batch( # ty: ignore[missing-argument] - "a", - "b", - queue="orders", - activate_in=_dt.timedelta(milliseconds=300), - ) - await asyncio.sleep(0.1) - assert received == [] - await _wait_until(lambda: len(received) == 2, timeout=2.0) - assert sorted(received) == ["a", "b"] + await broker.publish("lease-lost", queue="orders") # ty: ignore[missing-argument] + + assert received == ["lease-lost"] -async def test_fake_broker_publish_batch_with_activate_at_delays_delivery() -> None: +async def test_fake_broker_publish_invokes_flush_retry_when_lease_lost() -> None: + """``mark_pending_with_lease`` returning False is logged and skipped on a nacked handler.""" + + class LeaseLostRetryClient(FakeOutboxClient): + async def mark_pending_with_lease(self, *args: object, **kwargs: object) -> bool: # noqa: ARG002 + return False + + broker = _make_broker() + attempts: list[str] = [] + + @broker.subscriber("orders", retry_strategy=ConstantRetry(delay_seconds=0.05, max_attempts=10)) + async def handle(body: str) -> None: + attempts.append(body) + msg = "always fails" + raise RuntimeError(msg) + + test_broker = TestOutboxBroker(broker) + test_broker.fake_client = LeaseLostRetryClient() + async with test_broker: + await broker.publish("never-cleared", queue="orders") # ty: ignore[missing-argument] + + assert attempts == ["never-cleared"] + + +async def test_fake_broker_publish_swallows_post_consume_failure() -> None: + """``dispatch_one``'s outer except catches a delete that raises, so the next publish still works.""" + + class RaisingDeleteClient(FakeOutboxClient): + def __init__(self) -> None: + super().__init__() + self.calls = 0 + + async def delete_with_lease(self, message_id: int, acquired_token: uuid.UUID) -> bool: + self.calls += 1 + if self.calls == 1: + msg = "delete blew up" + raise RuntimeError(msg) + return await super().delete_with_lease(message_id, acquired_token) + broker = _make_broker() received: list[str] = [] - @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + @broker.subscriber("orders") async def handle(body: str) -> None: received.append(body) test_broker = TestOutboxBroker(broker) + raising = RaisingDeleteClient() + test_broker.fake_client = raising async with test_broker: - future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(milliseconds=300) - await broker.publish_batch("a", "b", queue="orders", activate_at=future) # ty: ignore[missing-argument] - await asyncio.sleep(0.1) - assert received == [] - await _wait_until(lambda: len(received) == 2, timeout=2.0) - assert sorted(received) == ["a", "b"] + await broker.publish("first-fails-on-delete", queue="orders") # ty: ignore[missing-argument] + await broker.publish("second-ok", queue="orders") # ty: ignore[missing-argument] + assert raising.calls >= 2 + assert received == ["first-fails-on-delete", "second-ok"] + + +async def test_fake_broker_retry_strategy_receives_handler_exception() -> None: + """RetryStrategyProto.get_next_attempt_delay must see the raised exception.""" + seen_exceptions: list[BaseException | None] = [] + + class RecordingStrategy(RetryStrategyProto): + def get_next_attempt_delay( + self, + *, + first_attempt_at: _dt.datetime, # noqa: ARG002 + last_attempt_at: _dt.datetime, # noqa: ARG002 + attempts_count: int, # noqa: ARG002 + exception: BaseException | None = None, + ) -> float | None: + seen_exceptions.append(exception) + return None # terminal -async def test_fake_broker_publish_batch_rejects_both_activate_in_and_activate_at() -> None: broker = _make_broker() + + @broker.subscriber("orders", retry_strategy=RecordingStrategy()) + async def handle(body: str) -> None: + del body + msg = "boom-transient" + raise RuntimeError(msg) + test_broker = TestOutboxBroker(broker) async with test_broker: - with pytest.raises(ValueError, match="at most one of activate_in / activate_at"): - await broker.publish_batch( # ty: ignore[missing-argument] - "x", - queue="orders", - activate_in=_dt.timedelta(seconds=1), - activate_at=_dt.datetime.now(tz=_dt.UTC), - ) + await broker.publish("payload", queue="orders") # ty: ignore[missing-argument] + assert len(seen_exceptions) == 1 + exc = seen_exceptions[0] + assert isinstance(exc, RuntimeError) + assert str(exc) == "boom-transient" -async def test_fake_broker_publish_batch_empty_bodies_is_noop() -> None: + +# --- Loop-mode tests (run_loops=True) ------------------------------------------------- + + +async def test_loop_mode_failing_handler_with_retry_reschedules() -> None: broker = _make_broker() - test_broker = TestOutboxBroker(broker) + attempts: list[str] = [] + + @broker.subscriber( + "orders", + min_fetch_interval=0.01, + max_fetch_interval=0.05, + retry_strategy=ConstantRetry(delay_seconds=0.05, max_attempts=3), + ) + async def handle(body: str) -> None: + attempts.append(body) + if len(attempts) < 3: + msg = "transient" + raise RuntimeError(msg) + + test_broker = TestOutboxBroker(broker, run_loops=True) async with test_broker: - await broker.publish_batch(queue="orders") # ty: ignore[missing-argument] - assert test_broker.fake_client.rows == [] + p, h = encode_payload("retry-me") + test_broker.feed("orders", p, headers=h) + await _wait_until(lambda: len(attempts) == 3, timeout=3.0) + await _wait_until(lambda: not test_broker.fake_client.rows, timeout=1.0) + + +async def test_loop_mode_max_deliveries_drops_row() -> None: + broker = _make_broker() + + @broker.subscriber( + "orders", + min_fetch_interval=0.01, + max_fetch_interval=0.05, + max_deliveries=1, + ) + async def handle(body: str) -> None: ... + + test_broker = TestOutboxBroker(broker, run_loops=True) + async with test_broker: + p, h = encode_payload("never") + test_broker.feed("orders", p, headers=h) + # Pre-bump deliveries_count so allow_delivery rejects on first claim — handler + # never runs, the row is just dropped. + test_broker.fake_client.rows[0].deliveries_count = 5 + await _wait_until(lambda: not test_broker.fake_client.rows, timeout=2.0) + + +async def test_loop_mode_expired_lease_is_reclaimed() -> None: + broker = _make_broker() + received: list[str] = [] + + @broker.subscriber( + "orders", + min_fetch_interval=0.01, + max_fetch_interval=0.05, + lease_ttl_seconds=0.1, + ) + async def handle(body: str) -> None: + received.append(body) + + test_broker = TestOutboxBroker(broker, run_loops=True) + async with test_broker: + # Manually create a row with an expired lease — fetch must reclaim it. + old = _dt.datetime.now(tz=_dt.UTC) - _dt.timedelta(seconds=10) + test_broker.fake_client._rows.append( # noqa: SLF001 + _FakeRow( + id=99, + queue="orders", + payload=encode_payload("stuck-payload")[0], + headers=encode_payload("stuck-payload")[1], + acquired_at=old, + acquired_token=uuid.uuid4(), + ), + ) + test_broker.fake_client._next_id = 100 # noqa: SLF001 + await _wait_until(lambda: received, timeout=3.0) + +async def test_loop_mode_delays_delivery_by_next_attempt_at() -> None: + broker = _make_broker() + received: list[str] = [] -# --- subscriber error paths via subclassed FakeOutboxClient --- + @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + async def handle(body: str) -> None: + received.append(body) + test_broker = TestOutboxBroker(broker, run_loops=True) + async with test_broker: + future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(milliseconds=300) + test_broker.feed("orders", b'"delayed"', next_attempt_at=future, headers={"content-type": "application/json"}) + # Before the gate opens: nothing delivered. + await asyncio.sleep(0.1) + assert received == [] + # After the gate opens (and at least one fetch tick): delivered. + await _wait_until(lambda: received, timeout=2.0) + assert received == ["delayed"] -async def test_fetch_loop_recovers_from_client_error() -> None: - from faststream_outbox.testing import FakeOutboxClient # noqa: PLC0415 +async def test_loop_mode_fetch_loop_recovers_from_client_error() -> None: class FlakyFetchClient(FakeOutboxClient): def __init__(self) -> None: super().__init__() @@ -422,7 +553,7 @@ async def fetch(self, queues, *, limit, lease_ttl_seconds): async def handle(body: str) -> None: received.append(body) - test_broker = TestOutboxBroker(broker) + test_broker = TestOutboxBroker(broker, run_loops=True) test_broker.fake_client = FlakyFetchClient() async with test_broker: p, h = encode_payload("after-error") @@ -430,7 +561,7 @@ async def handle(body: str) -> None: await _wait_until(lambda: received, timeout=5.0) -async def test_fetch_loop_backs_off_when_inflight_full() -> None: +async def test_loop_mode_fetch_loop_backs_off_when_inflight_full() -> None: broker = _make_broker() handler_started = asyncio.Event() release_handler = asyncio.Event() @@ -448,146 +579,28 @@ async def handle(body: str) -> None: await release_handler.wait() received.append(body) - test_broker = TestOutboxBroker(broker) + test_broker = TestOutboxBroker(broker, run_loops=True) async with test_broker: p, h = encode_payload("first") test_broker.feed("orders", p, headers=h) await asyncio.wait_for(handler_started.wait(), timeout=2.0) - # Now feed a second row while the worker is busy. The fetch loop must - # see inflight queue full (free <= 0) and take the short-sleep branch. + # Second feed while worker is busy: fetch loop sees inflight queue full, + # takes the short-sleep branch. p2, h2 = encode_payload("second") test_broker.feed("orders", p2, headers=h2) - await asyncio.sleep(0.1) # let the fetch loop spin against a full queue + await asyncio.sleep(0.1) release_handler.set() await _wait_until(lambda: len(received) == 2, timeout=5.0) -async def test_subscriber_with_no_handler_skips_loop_setup() -> None: - """Calling subscriber.start() with no handler attached early-returns; no loops spawn.""" - from faststream_outbox.subscriber.factory import create_subscriber # noqa: PLC0415 - - metadata = MetaData() - t = make_outbox_table(metadata) - broker = OutboxBroker(outbox_table=t) - sub = create_subscriber( - queues=["empty-queue"], - max_workers=1, - retry_strategy=None, - fetch_batch_size=1, - min_fetch_interval=1.0, - max_fetch_interval=10.0, - lease_ttl_seconds=60.0, - max_deliveries=None, - config=broker.config.broker_config, # type: ignore[arg-type] - ) - broker._subscribers.add(sub) # noqa: SLF001 # ty: ignore[unresolved-attribute] - async with TestOutboxBroker(broker): - # Inside the test broker the logger is wired; call start() directly so the - # ``if not self.calls: return`` branch fires (no add_call() was performed). - await sub.start() - # No tasks added because the early-return short-circuits before add_task calls. - assert sub.tasks == [] or all(t.done() for t in sub.tasks) - - -async def test_flush_terminal_when_lease_lost_logs_and_skips() -> None: - """Worker tries to DELETE but acquired_token no longer matches → log and skip.""" - from faststream_outbox.testing import FakeOutboxClient # noqa: PLC0415 - - class LeaseLostClient(FakeOutboxClient): - async def delete_with_lease(self, message_id, acquired_token): # noqa: ARG002 - return False # always pretend the lease is gone - - broker = _make_broker() - received: list[str] = [] - - @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) - async def handle(body: str) -> None: - received.append(body) - - test_broker = TestOutboxBroker(broker) - test_broker.fake_client = LeaseLostClient() - async with test_broker: - p, h = encode_payload("lease-lost") - test_broker.feed("orders", p, headers=h) - await _wait_until(lambda: received, timeout=5.0) - - -async def test_flush_retry_when_lease_lost_logs_and_skips() -> None: - """Handler nacks; mark_pending_with_lease returns False → log and skip.""" - from faststream_outbox.testing import FakeOutboxClient # noqa: PLC0415 - - class LeaseLostRetryClient(FakeOutboxClient): - async def mark_pending_with_lease(self, *args, **kwargs): # noqa: ARG002 - return False - - broker = _make_broker() - attempts: list[str] = [] - - @broker.subscriber( - "orders", - min_fetch_interval=0.01, - max_fetch_interval=0.05, - retry_strategy=ConstantRetry(delay_seconds=0.05, max_attempts=10), - ) - async def handle(body: str) -> None: - attempts.append(body) - msg = "always fails" - raise RuntimeError(msg) - - test_broker = TestOutboxBroker(broker) - test_broker.fake_client = LeaseLostRetryClient() - async with test_broker: - p, h = encode_payload("never-cleared") - test_broker.feed("orders", p, headers=h) - await _wait_until(lambda: attempts, timeout=5.0) - - -async def test_worker_outer_except_catches_post_consume_failure() -> None: - """If the post-consume terminal write raises, the outer worker except logs and the loop survives.""" - from faststream_outbox.testing import FakeOutboxClient # noqa: PLC0415 - - class RaisingDeleteClient(FakeOutboxClient): - def __init__(self) -> None: - super().__init__() - self.calls = 0 - - async def delete_with_lease(self, message_id, acquired_token): - self.calls += 1 - if self.calls == 1: - msg = "delete blew up" - raise RuntimeError(msg) - return await super().delete_with_lease(message_id, acquired_token) - - broker = _make_broker() - received: list[str] = [] - - @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) - async def handle(body: str) -> None: - received.append(body) - - test_broker = TestOutboxBroker(broker) - raising = RaisingDeleteClient() - test_broker.fake_client = raising - async with test_broker: - p1, h1 = encode_payload("first-fails-on-delete") - test_broker.feed("orders", p1, headers=h1) - # First delete raises → outer except catches; subscriber loop survives. - # Second message proves the loop didn't die. - await _wait_until(lambda: raising.calls >= 1, timeout=2.0) - p2, h2 = encode_payload("second-ok") - test_broker.feed("orders", p2, headers=h2) - await _wait_until(lambda: len(received) == 2, timeout=5.0) - - -async def test_flush_with_no_lease_token_is_noop() -> None: - """If acquired_token is somehow None (defensive), _flush_terminal early-returns.""" - from faststream_outbox.testing import FakeOutboxClient # noqa: PLC0415 +async def test_loop_mode_flush_with_no_lease_token_is_noop() -> None: + """Fetch strips the lease token → _flush_terminal early-returns. Loop-only path.""" class TokenStrippingClient(FakeOutboxClient): async def fetch(self, queues, *, limit, lease_ttl_seconds): rows = await super().fetch(queues, limit=limit, lease_ttl_seconds=lease_ttl_seconds) for row in rows: - row.acquired_token = None # strip the lease + row.acquired_token = None return rows broker = _make_broker() @@ -597,7 +610,7 @@ async def fetch(self, queues, *, limit, lease_ttl_seconds): async def handle(body: str) -> None: received.append(body) - test_broker = TestOutboxBroker(broker) + test_broker = TestOutboxBroker(broker, run_loops=True) test_broker.fake_client = TokenStrippingClient() async with test_broker: p, h = encode_payload("no-token") @@ -605,10 +618,7 @@ async def handle(body: str) -> None: await _wait_until(lambda: received, timeout=5.0) -async def test_flush_retry_with_no_lease_token_is_noop() -> None: - """If acquired_token is None and the handler nacks, _flush_retry early-returns.""" - from faststream_outbox.testing import FakeOutboxClient # noqa: PLC0415 - +async def test_loop_mode_flush_retry_with_no_lease_token_is_noop() -> None: class TokenStrippingClient(FakeOutboxClient): async def fetch(self, queues, *, limit, lease_ttl_seconds): rows = await super().fetch(queues, limit=limit, lease_ttl_seconds=lease_ttl_seconds) @@ -630,7 +640,7 @@ async def handle(body: str) -> None: msg = "fail" raise RuntimeError(msg) - test_broker = TestOutboxBroker(broker) + test_broker = TestOutboxBroker(broker, run_loops=True) test_broker.fake_client = TokenStrippingClient() async with test_broker: p, h = encode_payload("retry-no-token") @@ -638,55 +648,7 @@ async def handle(body: str) -> None: await _wait_until(lambda: attempts, timeout=5.0) -async def test_fake_connect_is_noop() -> None: - broker = _make_broker() - test_broker = TestOutboxBroker(broker) - # Direct call exercises L226 even though it's also called during __aenter__. - await test_broker._fake_connect(broker) # noqa: SLF001 - - -async def test_retry_strategy_receives_handler_exception() -> None: - """RetryStrategyProto.get_next_attempt_delay must see the raised exception, not None.""" - seen_exceptions: list[BaseException | None] = [] - - class RecordingStrategy(RetryStrategyProto): - def get_next_attempt_delay( - self, - *, - first_attempt_at: _dt.datetime, # noqa: ARG002 - last_attempt_at: _dt.datetime, # noqa: ARG002 - attempts_count: int, # noqa: ARG002 - exception: BaseException | None = None, - ) -> float | None: - seen_exceptions.append(exception) - return None # terminal so the test wraps up promptly - - broker = _make_broker() - - @broker.subscriber( - "orders", - min_fetch_interval=0.01, - max_fetch_interval=0.05, - retry_strategy=RecordingStrategy(), - ) - async def handle(body: str) -> None: - del body - msg = "boom-transient" - raise RuntimeError(msg) - - test_broker = TestOutboxBroker(broker) - async with test_broker: - p, h = encode_payload("payload") - test_broker.feed("orders", p, headers=h) - await _wait_until(lambda: seen_exceptions, timeout=3.0) - - assert len(seen_exceptions) >= 1 - exc = seen_exceptions[0] - assert isinstance(exc, RuntimeError) - assert str(exc) == "boom-transient" - - -async def test_retry_strategy_can_branch_on_exception_type() -> None: +async def test_loop_mode_retry_strategy_can_branch_on_exception_type() -> None: """Subclass pattern from retry.py docstring: retry transient, terminate on permanent.""" attempts: list[str] = [] @@ -713,43 +675,98 @@ def get_next_attempt_delay( ) async def handle(body: str) -> None: attempts.append(body) - # First call: transient (gets retried via the strategy's retry branch). - # Second call: permanent (terminates via the strategy's None branch). if len(attempts) == 1: msg = "transient" raise RuntimeError(msg) msg = "permanent" raise ValueError(msg) - test_broker = TestOutboxBroker(broker) + test_broker = TestOutboxBroker(broker, run_loops=True) async with test_broker: p, h = encode_payload("body") test_broker.feed("orders", p, headers=h) await _wait_until(lambda: not test_broker.fake_client.rows, timeout=5.0) - assert len(attempts) == 2 # transient retried once, then permanent terminated + assert len(attempts) == 2 -async def test_router_subscriber_receives_plain_queue_publish() -> None: - """A subscriber registered via OutboxRouter must receive rows whose queue matches literally.""" - received: list[str] = [] +async def test_subscriber_with_no_handler_skips_loop_setup() -> None: + """Calling subscriber.start() with no handler attached early-returns; no loops spawn.""" + from faststream_outbox.subscriber.factory import create_subscriber # noqa: PLC0415 - router = OutboxRouter() + metadata = MetaData() + t = make_outbox_table(metadata) + broker = OutboxBroker(outbox_table=t) + sub = create_subscriber( + queues=["empty-queue"], + max_workers=1, + retry_strategy=None, + fetch_batch_size=1, + min_fetch_interval=1.0, + max_fetch_interval=10.0, + lease_ttl_seconds=60.0, + max_deliveries=None, + config=broker.config.broker_config, # type: ignore[arg-type] + ) + broker._subscribers.add(sub) # noqa: SLF001 # ty: ignore[unresolved-attribute] + async with TestOutboxBroker(broker, run_loops=True): + # Inside the test broker the logger is wired; call start() directly so the + # ``if not self.calls: return`` branch fires (no add_call() was performed). + await sub.start() + assert sub.tasks == [] or all(t.done() for t in sub.tasks) + + +async def test_sync_publish_skips_callless_subscriber() -> None: + """``_find_subscriber_for_queue`` must skip subscribers that have no registered call.""" + from faststream_outbox.subscriber.factory import create_subscriber # noqa: PLC0415 + + metadata = MetaData() + t = make_outbox_table(metadata) + broker = OutboxBroker(outbox_table=t) + # Add a call-less subscriber for "orphans" — sync-dispatch's lookup must skip it + # so the row stays in the fake client instead of crashing on a missing handler. + sub = create_subscriber( + queues=["orphans"], + max_workers=1, + retry_strategy=None, + fetch_batch_size=1, + min_fetch_interval=1.0, + max_fetch_interval=10.0, + lease_ttl_seconds=60.0, + max_deliveries=None, + config=broker.config.broker_config, # type: ignore[arg-type] + ) + broker._subscribers.add(sub) # noqa: SLF001 # ty: ignore[unresolved-attribute] + + test_broker = TestOutboxBroker(broker) + async with test_broker: + await broker.publish("ignored", queue="orphans") # ty: ignore[missing-argument] + + assert len(test_broker.fake_client.rows) == 1 - @router.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) - async def handle(body: str) -> None: - received.append(body) +# --- TestOutboxBroker plumbing -------------------------------------------------------- + + +async def test_fake_connect_is_noop() -> None: broker = _make_broker() - broker.include_router(router) + test_broker = TestOutboxBroker(broker) + await test_broker._fake_connect(broker) # noqa: SLF001 + +async def test_test_broker_feed_forwards_timer_id() -> None: + broker = _make_broker() test_broker = TestOutboxBroker(broker) async with test_broker: - p, h = encode_payload("via-router") - test_broker.feed("orders", p, headers=h) - await _wait_until(lambda: received, timeout=3.0) + first = test_broker.feed("orders", b"x", timer_id="email-1") + second = test_broker.feed("orders", b"y", timer_id="email-1") + assert first is not None + assert second is None + assert len(test_broker.fake_client.rows) == 1 + assert test_broker.fake_client.rows[0].timer_id == "email-1" - assert received == ["via-router"] + +# --- FakeOutboxClient unit tests ------------------------------------------------------ async def test_fake_client_fetch_with_conn_mirrors_fetch() -> None: @@ -768,13 +785,12 @@ async def test_fake_client_feed_timer_id_dedup() -> None: first = fake.feed(queue="q", payload=b"x", timer_id="email-1") second = fake.feed(queue="q", payload=b"y", timer_id="email-1") assert first is not None - assert second is None # second feed is a no-op + assert second is None assert len(fake.rows) == 1 - assert fake.rows[0].payload == b"x" # first wins + assert fake.rows[0].payload == b"x" async def test_fake_client_feed_timer_id_different_queues_allowed() -> None: - """Unique-on-(queue, timer_id): same timer_id in a different queue is independent.""" fake = FakeOutboxClient() a = fake.feed(queue="q1", payload=b"x", timer_id="email-1") b = fake.feed(queue="q2", payload=b"y", timer_id="email-1") @@ -806,41 +822,7 @@ async def test_fake_client_cancel_timer_unknown_returns_false() -> None: async def test_fake_client_cancel_timer_skips_leased_row() -> None: fake = FakeOutboxClient() fake.feed(queue="q", payload=b"x", timer_id="email-1") - # Simulate the row having been claimed by a worker. fake.rows[0].acquired_token = uuid.uuid4() fake.rows[0].acquired_at = _dt.datetime.now(tz=_dt.UTC) assert await fake.cancel_timer(queue="q", timer_id="email-1") is False - assert len(fake.rows) == 1 # row still there - - -async def test_test_broker_feed_forwards_timer_id() -> None: - broker = _make_broker() - test_broker = TestOutboxBroker(broker) - async with test_broker: - first = test_broker.feed("orders", b"x", timer_id="email-1") - second = test_broker.feed("orders", b"y", timer_id="email-1") - assert first is not None - assert second is None - assert len(test_broker.fake_client.rows) == 1 - assert test_broker.fake_client.rows[0].timer_id == "email-1" - - -async def test_fake_broker_delays_delivery_by_next_attempt_at() -> None: - """Row fed with next_attempt_at=future should not be dispatched until the gate opens.""" - broker = _make_broker() - received: list[str] = [] - - @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) - async def handle(body: str) -> None: - received.append(body) - - test_broker = TestOutboxBroker(broker) - async with test_broker: - future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(milliseconds=300) - test_broker.feed("orders", b'"delayed"', next_attempt_at=future, headers={"content-type": "application/json"}) - # Before the gate opens: nothing delivered. - await asyncio.sleep(0.1) - assert received == [] - # After the gate opens (and at least one fetch tick): delivered. - await _wait_until(lambda: received, timeout=2.0) - assert received == ["delayed"] + assert len(fake.rows) == 1