From 3da16753f1b58f4eaa926c7da8a137fbd6bb293e Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Wed, 20 May 2026 20:56:15 +0300 Subject: [PATCH] feat: broker.fetch_unprocessed for test assertions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a session-based method to read every row still in the outbox table (pending, in-flight, or future-dated) — successful delivery deletes the row, so anything remaining is unprocessed. Same transactional contract as publish/cancel_timer: runs on the caller's session, no lease acquired, no row state mutated. Optional queue filter. Co-Authored-By: Claude Opus 4.7 (1M context) --- faststream_outbox/broker.py | 29 +++++++++++++++-- tests/test_integration.py | 62 +++++++++++++++++++++++++++++++++++++ tests/test_unit.py | 40 ++++++++++++++++++++++++ 3 files changed, 129 insertions(+), 2 deletions(-) diff --git a/faststream_outbox/broker.py b/faststream_outbox/broker.py index 2100466..f75af73 100644 --- a/faststream_outbox/broker.py +++ b/faststream_outbox/broker.py @@ -23,11 +23,11 @@ from faststream._internal.types import BrokerMiddleware, CustomCallable from faststream.specification.schema import BrokerSpec from faststream.specification.schema.extra import Tag, TagDict -from sqlalchemy import Float, bindparam, delete, func, insert, text +from sqlalchemy import Float, bindparam, delete, func, insert, select, text from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession -from faststream_outbox.client import OutboxClient +from faststream_outbox.client import OutboxClient, _row_to_message from faststream_outbox.configs import EngineState, OutboxBrokerConfig from faststream_outbox.envelope import _encode_payload from faststream_outbox.message import OutboxInnerMessage @@ -329,6 +329,31 @@ async def cancel_timer( result = await session.execute(stmt) return (result.rowcount or 0) > 0 # ty: ignore[unresolved-attribute] + async def fetch_unprocessed( + self, + *, + session: AsyncSession, + queue: str | None = None, + ) -> list[OutboxInnerMessage]: + """ + Return outbox rows currently in the table — pending, in-flight, or future-dated. + + Intended for test assertions: a successful delivery deletes the row, so anything + still in the table is "unprocessed". Pass *queue* to filter to a single queue; + omit it to return rows across all queues. Runs on the caller's session (same + transactional contract as :meth:`publish`); does not acquire a lease and does + not mutate row state, so it is safe to call alongside running subscribers. + """ + if not isinstance(session, AsyncSession): + msg = "broker.fetch_unprocessed requires an sqlalchemy.ext.asyncio.AsyncSession" + raise TypeError(msg) + t = self._outbox_table + stmt = select(*t.c).order_by(t.c.id) + if queue is not None: + stmt = stmt.where(t.c.queue == queue) + result = await session.execute(stmt) + return [_row_to_message(dict(row)) for row in result.mappings().all()] + async def _notify(self, session: AsyncSession, queue: str) -> None: """ Emit ``pg_notify('outbox_', queue)`` so listening subscribers wake immediately. diff --git a/tests/test_integration.py b/tests/test_integration.py index e2dbdfd..1bcc8b7 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -595,3 +595,65 @@ def _cb(_conn, _pid, _channel, payload) -> None: await listener.close() assert received_payloads == ["orders"] + + +async def test_fetch_unprocessed_returns_all_queues(pg_engine, outbox_table) -> None: + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with session_factory() as session, session.begin(): + await broker.publish("o-1", queue="orders", session=session) + await broker.publish("o-2", queue="orders", session=session) + await broker.publish("s-1", queue="shipments", session=session) + + async with session_factory() as session: + rows = await broker.fetch_unprocessed(session=session) + + assert [r.queue for r in rows] == ["orders", "orders", "shipments"] + assert [r.id for r in rows] == sorted(r.id for r in rows) # ordered by id + + +async def test_fetch_unprocessed_filters_by_queue(pg_engine, outbox_table) -> None: + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with session_factory() as session, session.begin(): + await broker.publish("o-1", queue="orders", session=session) + await broker.publish("s-1", queue="shipments", session=session) + + async with session_factory() as session: + orders = await broker.fetch_unprocessed(session=session, queue="orders") + + assert len(orders) == 1 + assert orders[0].queue == "orders" + + +async def test_fetch_unprocessed_includes_future_dated_rows(pg_engine, outbox_table) -> None: + """Future-dated rows (activate_in) are unprocessed too — fetch_unprocessed must surface them.""" + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with session_factory() as session, session.begin(): + await broker.publish("now", queue="orders", session=session) + await broker.publish( + "later", + queue="orders", + session=session, + activate_in=_dt.timedelta(minutes=5), + ) + + async with session_factory() as session: + rows = await broker.fetch_unprocessed(session=session, queue="orders") + + assert len(rows) == 2 + now = _dt.datetime.now(tz=_dt.UTC) + future = [r for r in rows if r.next_attempt_at > now + _dt.timedelta(minutes=1)] + assert len(future) == 1 + + +async def test_fetch_unprocessed_reads_uncommitted_writes_in_same_session(pg_engine, outbox_table) -> None: + """Same-session contract: a read inside the producer's open transaction sees its own writes.""" + broker = OutboxBroker(pg_engine, outbox_table=outbox_table) + session_factory = async_sessionmaker(pg_engine, expire_on_commit=False) + async with session_factory() as session, session.begin(): + await broker.publish("pre-commit", queue="orders", session=session) + rows = await broker.fetch_unprocessed(session=session) + assert len(rows) == 1 + assert rows[0].queue == "orders" diff --git a/tests/test_unit.py b/tests/test_unit.py index f45b56a..2f7a1af 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -527,6 +527,46 @@ async def test_broker_cancel_timer_emits_delete_with_lease_guard() -> None: assert params["timer_id_1"] == "email-1" +async def test_broker_fetch_unprocessed_rejects_non_async_session() -> None: + broker = _make_broker() + with pytest.raises(TypeError, match="AsyncSession"): + await broker.fetch_unprocessed(session=object()) # ty: ignore[invalid-argument-type] + + +def _fetch_unprocessed_session_mock() -> AsyncMock: + """AsyncSession mock whose ``execute().mappings().all()`` returns an empty list.""" + session = AsyncMock(spec=AsyncSession) + result = MagicMock() + result.mappings.return_value.all.return_value = [] + session.execute.return_value = result + return session + + +async def test_broker_fetch_unprocessed_builds_select_all_columns() -> None: + broker = _make_broker() + session = _fetch_unprocessed_session_mock() + rows = await broker.fetch_unprocessed(session=session) + assert rows == [] + stmt = session.execute.await_args_list[0].args[0] + sql = str(stmt) + assert "SELECT" in sql + assert "FROM outbox" in sql + assert "ORDER BY outbox.id" in sql + # No queue filter compiled when queue=None + assert "WHERE" not in sql + + +async def test_broker_fetch_unprocessed_filters_by_queue() -> None: + broker = _make_broker() + session = _fetch_unprocessed_session_mock() + await broker.fetch_unprocessed(session=session, queue="orders") + stmt = session.execute.await_args_list[0].args[0] + sql = str(stmt) + assert "WHERE outbox.queue =" in sql + params = stmt.compile().params + assert params["queue_1"] == "orders" + + async def test_broker_cancel_timer_returns_false_when_nothing_deleted() -> None: broker = _make_broker() session = AsyncMock(spec=AsyncSession)