From 5578ffd0fc5196cbbe94c3aa72b4be8b8aef8a94 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Thu, 21 May 2026 08:46:18 +0300 Subject: [PATCH] fix: route broker.publish through fake client in TestOutboxBroker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `broker.publish` / `publish_batch` / `cancel_timer` inside `TestOutboxBroker` previously went through SQLAlchemy `session.execute`, bypassing the patched `FakeOutboxClient` — the row landed in the real DB while subscriber loops polled the in-memory fake, so handlers never fired. Patch all three methods in `_patch_broker` to route to the fake; `session` becomes optional in test mode, matching FastStream's standard test-broker idiom. Co-Authored-By: Claude Opus 4.7 (1M context) --- faststream_outbox/testing.py | 107 ++++++++++++++++++++- tests/test_fake.py | 176 ++++++++++++++++++++++++++++++++++- 2 files changed, 279 insertions(+), 4 deletions(-) diff --git a/faststream_outbox/testing.py b/faststream_outbox/testing.py index e170804..924fc70 100644 --- a/faststream_outbox/testing.py +++ b/faststream_outbox/testing.py @@ -12,10 +12,12 @@ import uuid from contextlib import contextmanager from dataclasses import dataclass, field +from unittest import mock from faststream._internal.testing.broker import TestBroker from faststream_outbox.broker import OutboxBroker +from faststream_outbox.envelope import _encode_payload from faststream_outbox.message import OutboxInnerMessage @@ -192,6 +194,98 @@ def _to_inner(row: _FakeRow) -> OutboxInnerMessage: ) +def _build_fake_publish( + fake_client: FakeOutboxClient, + serializer: typing.Any, +) -> typing.Callable[..., typing.Awaitable[int | None]]: + async def fake_publish( # noqa: PLR0913 + body: typing.Any, + *, + queue: str, + session: typing.Any = None, + headers: dict[str, str] | None = None, + correlation_id: str | None = None, + activate_in: _dt.timedelta | None = None, + activate_at: _dt.datetime | None = None, + timer_id: str | None = None, + ) -> int | None: + # session is ignored in test mode — the fake client has no transaction. + del session + if activate_in is not None and activate_at is not None: + msg = "broker.publish accepts at most one of activate_in / activate_at" + raise ValueError(msg) + payload, hdrs = _encode_payload( + body, + headers=headers, + correlation_id=correlation_id, + serializer=serializer, + ) + next_at: _dt.datetime | None = None + if activate_in is not None: + next_at = _utcnow() + activate_in + elif activate_at is not None: + next_at = activate_at + return fake_client.feed( + queue=queue, + payload=payload, + headers=hdrs, + next_attempt_at=next_at, + timer_id=timer_id, + ) + + return fake_publish + + +def _build_fake_publish_batch( + fake_client: FakeOutboxClient, + serializer: typing.Any, +) -> typing.Callable[..., typing.Awaitable[None]]: + async def fake_publish_batch( + *bodies: typing.Any, + queue: str, + session: typing.Any = None, + headers: dict[str, str] | None = None, + activate_in: _dt.timedelta | None = None, + activate_at: _dt.datetime | None = None, + ) -> None: + del session + if activate_in is not None and activate_at is not None: + msg = "broker.publish_batch accepts at most one of activate_in / activate_at" + raise ValueError(msg) + if not bodies: + return + next_at: _dt.datetime | None = None + if activate_in is not None: + next_at = _utcnow() + activate_in + elif activate_at is not None: + next_at = activate_at + for body in bodies: + payload, hdrs = _encode_payload(body, headers=headers, serializer=serializer) + fake_client.feed( + queue=queue, + payload=payload, + headers=hdrs, + next_attempt_at=next_at, + ) + + return fake_publish_batch + + +def _build_fake_cancel_timer( + fake_client: FakeOutboxClient, +) -> typing.Callable[..., typing.Awaitable[bool]]: + async def fake_cancel_timer( + *, + queue: str, + timer_id: str, + session: typing.Any = None, + ) -> bool: + del session + return await fake_client.cancel_timer(queue=queue, timer_id=timer_id) + + return fake_cancel_timer + + class TestOutboxBroker(TestBroker[OutboxBroker]): # ty: ignore[invalid-type-arguments] """Test harness that runs the real subscriber loops against an in-memory client.""" @@ -229,8 +323,19 @@ def _patch_producer(self, broker: OutboxBroker) -> "Iterator[None]": def _patch_broker(self, broker: OutboxBroker) -> "Iterator[None]": original_client = broker.config.broker_config.client broker.config.broker_config.client = self.fake_client + # Mirror real publish's serializer wiring so pydantic / dataclass bodies + # encode identically in tests. + serializer = broker.config.broker_config.fd_config._serializer # noqa: SLF001 + fake_publish = _build_fake_publish(self.fake_client, serializer) + fake_publish_batch = _build_fake_publish_batch(self.fake_client, serializer) + fake_cancel_timer = _build_fake_cancel_timer(self.fake_client) try: - with super()._patch_broker(broker): + with ( + mock.patch.object(broker, "publish", new=fake_publish), + mock.patch.object(broker, "publish_batch", new=fake_publish_batch), + mock.patch.object(broker, "cancel_timer", new=fake_cancel_timer), + super()._patch_broker(broker), + ): yield finally: broker.config.broker_config.client = original_client diff --git a/tests/test_fake.py b/tests/test_fake.py index 0787207..dd7b19d 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -219,12 +219,182 @@ async def test_fake_broker_request_raises() -> None: await broker.request(b"x") -async def test_fake_broker_publish_rejects_non_async_session() -> None: +async def test_fake_broker_publish_triggers_handler() -> None: + """ + ``broker.publish`` inside ``TestOutboxBroker`` must route to the fake client and fire the handler. + + This is the standard FastStream test-broker idiom — the same call site works in + tests as in production, with the session argument ignored in test mode. + """ + broker = _make_broker() + received: list[dict] = [] + + @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + async def handle(body: dict) -> None: + received.append(body) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + await broker.publish({"order_id": 1}, queue="orders") # ty: ignore[missing-argument] + await _wait_until(lambda: received, timeout=2.0) + + assert received == [{"order_id": 1}] + assert test_broker.fake_client.rows == [] # row deleted after ack + + +async def test_fake_broker_publish_batch_triggers_handler() -> None: + broker = _make_broker() + received: list[str] = [] + + @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + async def handle(body: str) -> None: + received.append(body) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + await broker.publish_batch("a", "b", "c", queue="orders") # ty: ignore[missing-argument] + await _wait_until(lambda: len(received) == 3, timeout=2.0) + + assert sorted(received) == ["a", "b", "c"] + + +async def test_fake_broker_publish_with_timer_id_dedups() -> None: + broker = _make_broker() + + @broker.subscriber("orders", min_fetch_interval=10.0, max_fetch_interval=10.0) + async def handle(body: str) -> None: ... + + test_broker = TestOutboxBroker(broker) + async with test_broker: + first = await broker.publish("x", queue="orders", timer_id="email-1") # ty: ignore[missing-argument] + second = await broker.publish("y", queue="orders", timer_id="email-1") # ty: ignore[missing-argument] + assert first is not None + assert second is None + assert len(test_broker.fake_client.rows) == 1 + + +async def test_fake_broker_publish_with_activate_in_delays_delivery() -> None: + broker = _make_broker() + received: list[str] = [] + + @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + async def handle(body: str) -> None: + received.append(body) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + await broker.publish( # ty: ignore[missing-argument] + "delayed", + queue="orders", + activate_in=_dt.timedelta(milliseconds=300), + ) + await asyncio.sleep(0.1) + assert received == [] + await _wait_until(lambda: received, timeout=2.0) + assert received == ["delayed"] + + +async def test_fake_broker_publish_rejects_both_activate_in_and_activate_at() -> None: + broker = _make_broker() + test_broker = TestOutboxBroker(broker) + async with test_broker: + with pytest.raises(ValueError, match="at most one of activate_in / activate_at"): + await broker.publish( # ty: ignore[missing-argument] + "x", + queue="orders", + activate_in=_dt.timedelta(seconds=1), + activate_at=_dt.datetime.now(tz=_dt.UTC), + ) + + +async def test_fake_broker_cancel_timer_removes_row() -> None: + broker = _make_broker() + test_broker = TestOutboxBroker(broker) + async with test_broker: + await broker.publish("x", queue="orders", timer_id="email-1") # ty: ignore[missing-argument] + assert len(test_broker.fake_client.rows) == 1 + cancelled = await broker.cancel_timer(queue="orders", timer_id="email-1") # ty: ignore[missing-argument] + assert cancelled is True + assert test_broker.fake_client.rows == [] + + +async def test_fake_broker_publish_with_activate_at_delays_delivery() -> None: + broker = _make_broker() + received: list[str] = [] + + @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + async def handle(body: str) -> None: + received.append(body) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(milliseconds=300) + await broker.publish("at-future", queue="orders", activate_at=future) # ty: ignore[missing-argument] + await asyncio.sleep(0.1) + assert received == [] + await _wait_until(lambda: received, timeout=2.0) + assert received == ["at-future"] + + +async def test_fake_broker_publish_batch_with_activate_in_delays_delivery() -> None: + broker = _make_broker() + received: list[str] = [] + + @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + async def handle(body: str) -> None: + received.append(body) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + await broker.publish_batch( # ty: ignore[missing-argument] + "a", + "b", + queue="orders", + activate_in=_dt.timedelta(milliseconds=300), + ) + await asyncio.sleep(0.1) + assert received == [] + await _wait_until(lambda: len(received) == 2, timeout=2.0) + assert sorted(received) == ["a", "b"] + + +async def test_fake_broker_publish_batch_with_activate_at_delays_delivery() -> None: + broker = _make_broker() + received: list[str] = [] + + @broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05) + async def handle(body: str) -> None: + received.append(body) + + test_broker = TestOutboxBroker(broker) + async with test_broker: + future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(milliseconds=300) + await broker.publish_batch("a", "b", queue="orders", activate_at=future) # ty: ignore[missing-argument] + await asyncio.sleep(0.1) + assert received == [] + await _wait_until(lambda: len(received) == 2, timeout=2.0) + assert sorted(received) == ["a", "b"] + + +async def test_fake_broker_publish_batch_rejects_both_activate_in_and_activate_at() -> None: + broker = _make_broker() + test_broker = TestOutboxBroker(broker) + async with test_broker: + with pytest.raises(ValueError, match="at most one of activate_in / activate_at"): + await broker.publish_batch( # ty: ignore[missing-argument] + "x", + queue="orders", + activate_in=_dt.timedelta(seconds=1), + activate_at=_dt.datetime.now(tz=_dt.UTC), + ) + + +async def test_fake_broker_publish_batch_empty_bodies_is_noop() -> None: broker = _make_broker() test_broker = TestOutboxBroker(broker) async with test_broker: - with pytest.raises(TypeError, match="AsyncSession"): - await broker.publish(b"x", queue="orders", session=object()) # ty: ignore[invalid-argument-type] + await broker.publish_batch(queue="orders") # ty: ignore[missing-argument] + assert test_broker.fake_client.rows == [] # --- subscriber error paths via subclassed FakeOutboxClient ---