Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 106 additions & 1 deletion faststream_outbox/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, field
from unittest import mock

from faststream._internal.testing.broker import TestBroker

from faststream_outbox.broker import OutboxBroker
from faststream_outbox.envelope import _encode_payload
from faststream_outbox.message import OutboxInnerMessage


Expand Down Expand Up @@ -192,6 +194,98 @@ def _to_inner(row: _FakeRow) -> OutboxInnerMessage:
)


def _build_fake_publish(
fake_client: FakeOutboxClient,
serializer: typing.Any,
) -> typing.Callable[..., typing.Awaitable[int | None]]:
async def fake_publish( # noqa: PLR0913
body: typing.Any,
*,
queue: str,
session: typing.Any = None,
headers: dict[str, str] | None = None,
correlation_id: str | None = None,
activate_in: _dt.timedelta | None = None,
activate_at: _dt.datetime | None = None,
timer_id: str | None = None,
) -> int | None:
# session is ignored in test mode — the fake client has no transaction.
del session
if activate_in is not None and activate_at is not None:
msg = "broker.publish accepts at most one of activate_in / activate_at"
raise ValueError(msg)
payload, hdrs = _encode_payload(
body,
headers=headers,
correlation_id=correlation_id,
serializer=serializer,
)
next_at: _dt.datetime | None = None
if activate_in is not None:
next_at = _utcnow() + activate_in
elif activate_at is not None:
next_at = activate_at
return fake_client.feed(
queue=queue,
payload=payload,
headers=hdrs,
next_attempt_at=next_at,
timer_id=timer_id,
)

return fake_publish


def _build_fake_publish_batch(
fake_client: FakeOutboxClient,
serializer: typing.Any,
) -> typing.Callable[..., typing.Awaitable[None]]:
async def fake_publish_batch(
*bodies: typing.Any,
queue: str,
session: typing.Any = None,
headers: dict[str, str] | None = None,
activate_in: _dt.timedelta | None = None,
activate_at: _dt.datetime | None = None,
) -> None:
del session
if activate_in is not None and activate_at is not None:
msg = "broker.publish_batch accepts at most one of activate_in / activate_at"
raise ValueError(msg)
if not bodies:
return
next_at: _dt.datetime | None = None
if activate_in is not None:
next_at = _utcnow() + activate_in
elif activate_at is not None:
next_at = activate_at
for body in bodies:
payload, hdrs = _encode_payload(body, headers=headers, serializer=serializer)
fake_client.feed(
queue=queue,
payload=payload,
headers=hdrs,
next_attempt_at=next_at,
)

return fake_publish_batch


def _build_fake_cancel_timer(
fake_client: FakeOutboxClient,
) -> typing.Callable[..., typing.Awaitable[bool]]:
async def fake_cancel_timer(
*,
queue: str,
timer_id: str,
session: typing.Any = None,
) -> bool:
del session
return await fake_client.cancel_timer(queue=queue, timer_id=timer_id)

return fake_cancel_timer


class TestOutboxBroker(TestBroker[OutboxBroker]): # ty: ignore[invalid-type-arguments]
"""Test harness that runs the real subscriber loops against an in-memory client."""

Expand Down Expand Up @@ -229,8 +323,19 @@ def _patch_producer(self, broker: OutboxBroker) -> "Iterator[None]":
def _patch_broker(self, broker: OutboxBroker) -> "Iterator[None]":
original_client = broker.config.broker_config.client
broker.config.broker_config.client = self.fake_client
# Mirror real publish's serializer wiring so pydantic / dataclass bodies
# encode identically in tests.
serializer = broker.config.broker_config.fd_config._serializer # noqa: SLF001
fake_publish = _build_fake_publish(self.fake_client, serializer)
fake_publish_batch = _build_fake_publish_batch(self.fake_client, serializer)
fake_cancel_timer = _build_fake_cancel_timer(self.fake_client)
try:
with super()._patch_broker(broker):
with (
mock.patch.object(broker, "publish", new=fake_publish),
mock.patch.object(broker, "publish_batch", new=fake_publish_batch),
mock.patch.object(broker, "cancel_timer", new=fake_cancel_timer),
super()._patch_broker(broker),
):
yield
finally:
broker.config.broker_config.client = original_client
Expand Down
176 changes: 173 additions & 3 deletions tests/test_fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,182 @@ async def test_fake_broker_request_raises() -> None:
await broker.request(b"x")


async def test_fake_broker_publish_rejects_non_async_session() -> None:
async def test_fake_broker_publish_triggers_handler() -> None:
"""
``broker.publish`` inside ``TestOutboxBroker`` must route to the fake client and fire the handler.

This is the standard FastStream test-broker idiom — the same call site works in
tests as in production, with the session argument ignored in test mode.
"""
broker = _make_broker()
received: list[dict] = []

@broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05)
async def handle(body: dict) -> None:
received.append(body)

test_broker = TestOutboxBroker(broker)
async with test_broker:
await broker.publish({"order_id": 1}, queue="orders") # ty: ignore[missing-argument]
await _wait_until(lambda: received, timeout=2.0)

assert received == [{"order_id": 1}]
assert test_broker.fake_client.rows == [] # row deleted after ack


async def test_fake_broker_publish_batch_triggers_handler() -> None:
broker = _make_broker()
received: list[str] = []

@broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05)
async def handle(body: str) -> None:
received.append(body)

test_broker = TestOutboxBroker(broker)
async with test_broker:
await broker.publish_batch("a", "b", "c", queue="orders") # ty: ignore[missing-argument]
await _wait_until(lambda: len(received) == 3, timeout=2.0)

assert sorted(received) == ["a", "b", "c"]


async def test_fake_broker_publish_with_timer_id_dedups() -> None:
broker = _make_broker()

@broker.subscriber("orders", min_fetch_interval=10.0, max_fetch_interval=10.0)
async def handle(body: str) -> None: ...

test_broker = TestOutboxBroker(broker)
async with test_broker:
first = await broker.publish("x", queue="orders", timer_id="email-1") # ty: ignore[missing-argument]
second = await broker.publish("y", queue="orders", timer_id="email-1") # ty: ignore[missing-argument]
assert first is not None
assert second is None
assert len(test_broker.fake_client.rows) == 1


async def test_fake_broker_publish_with_activate_in_delays_delivery() -> None:
broker = _make_broker()
received: list[str] = []

@broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05)
async def handle(body: str) -> None:
received.append(body)

test_broker = TestOutboxBroker(broker)
async with test_broker:
await broker.publish( # ty: ignore[missing-argument]
"delayed",
queue="orders",
activate_in=_dt.timedelta(milliseconds=300),
)
await asyncio.sleep(0.1)
assert received == []
await _wait_until(lambda: received, timeout=2.0)
assert received == ["delayed"]


async def test_fake_broker_publish_rejects_both_activate_in_and_activate_at() -> None:
broker = _make_broker()
test_broker = TestOutboxBroker(broker)
async with test_broker:
with pytest.raises(ValueError, match="at most one of activate_in / activate_at"):
await broker.publish( # ty: ignore[missing-argument]
"x",
queue="orders",
activate_in=_dt.timedelta(seconds=1),
activate_at=_dt.datetime.now(tz=_dt.UTC),
)


async def test_fake_broker_cancel_timer_removes_row() -> None:
broker = _make_broker()
test_broker = TestOutboxBroker(broker)
async with test_broker:
await broker.publish("x", queue="orders", timer_id="email-1") # ty: ignore[missing-argument]
assert len(test_broker.fake_client.rows) == 1
cancelled = await broker.cancel_timer(queue="orders", timer_id="email-1") # ty: ignore[missing-argument]
assert cancelled is True
assert test_broker.fake_client.rows == []


async def test_fake_broker_publish_with_activate_at_delays_delivery() -> None:
broker = _make_broker()
received: list[str] = []

@broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05)
async def handle(body: str) -> None:
received.append(body)

test_broker = TestOutboxBroker(broker)
async with test_broker:
future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(milliseconds=300)
await broker.publish("at-future", queue="orders", activate_at=future) # ty: ignore[missing-argument]
await asyncio.sleep(0.1)
assert received == []
await _wait_until(lambda: received, timeout=2.0)
assert received == ["at-future"]


async def test_fake_broker_publish_batch_with_activate_in_delays_delivery() -> None:
broker = _make_broker()
received: list[str] = []

@broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05)
async def handle(body: str) -> None:
received.append(body)

test_broker = TestOutboxBroker(broker)
async with test_broker:
await broker.publish_batch( # ty: ignore[missing-argument]
"a",
"b",
queue="orders",
activate_in=_dt.timedelta(milliseconds=300),
)
await asyncio.sleep(0.1)
assert received == []
await _wait_until(lambda: len(received) == 2, timeout=2.0)
assert sorted(received) == ["a", "b"]


async def test_fake_broker_publish_batch_with_activate_at_delays_delivery() -> None:
broker = _make_broker()
received: list[str] = []

@broker.subscriber("orders", min_fetch_interval=0.01, max_fetch_interval=0.05)
async def handle(body: str) -> None:
received.append(body)

test_broker = TestOutboxBroker(broker)
async with test_broker:
future = _dt.datetime.now(tz=_dt.UTC) + _dt.timedelta(milliseconds=300)
await broker.publish_batch("a", "b", queue="orders", activate_at=future) # ty: ignore[missing-argument]
await asyncio.sleep(0.1)
assert received == []
await _wait_until(lambda: len(received) == 2, timeout=2.0)
assert sorted(received) == ["a", "b"]


async def test_fake_broker_publish_batch_rejects_both_activate_in_and_activate_at() -> None:
broker = _make_broker()
test_broker = TestOutboxBroker(broker)
async with test_broker:
with pytest.raises(ValueError, match="at most one of activate_in / activate_at"):
await broker.publish_batch( # ty: ignore[missing-argument]
"x",
queue="orders",
activate_in=_dt.timedelta(seconds=1),
activate_at=_dt.datetime.now(tz=_dt.UTC),
)


async def test_fake_broker_publish_batch_empty_bodies_is_noop() -> None:
broker = _make_broker()
test_broker = TestOutboxBroker(broker)
async with test_broker:
with pytest.raises(TypeError, match="AsyncSession"):
await broker.publish(b"x", queue="orders", session=object()) # ty: ignore[invalid-argument-type]
await broker.publish_batch(queue="orders") # ty: ignore[missing-argument]
assert test_broker.fake_client.rows == []


# --- subscriber error paths via subclassed FakeOutboxClient ---
Expand Down
Loading