Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions faststream_outbox/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
from faststream._internal.types import BrokerMiddleware, CustomCallable
from faststream.specification.schema import BrokerSpec
from faststream.specification.schema.extra import Tag, TagDict
from sqlalchemy import Float, bindparam, delete, func, insert, text
from sqlalchemy import Float, bindparam, delete, func, insert, select, text
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession

from faststream_outbox.client import OutboxClient
from faststream_outbox.client import OutboxClient, _row_to_message
from faststream_outbox.configs import EngineState, OutboxBrokerConfig
from faststream_outbox.envelope import _encode_payload
from faststream_outbox.message import OutboxInnerMessage
Expand Down Expand Up @@ -329,6 +329,31 @@ async def cancel_timer(
result = await session.execute(stmt)
return (result.rowcount or 0) > 0 # ty: ignore[unresolved-attribute]

async def fetch_unprocessed(
self,
*,
session: AsyncSession,
queue: str | None = None,
) -> list[OutboxInnerMessage]:
"""
Return outbox rows currently in the table — pending, in-flight, or future-dated.

Intended for test assertions: a successful delivery deletes the row, so anything
still in the table is "unprocessed". Pass *queue* to filter to a single queue;
omit it to return rows across all queues. Runs on the caller's session (same
transactional contract as :meth:`publish`); does not acquire a lease and does
not mutate row state, so it is safe to call alongside running subscribers.
"""
if not isinstance(session, AsyncSession):
msg = "broker.fetch_unprocessed requires an sqlalchemy.ext.asyncio.AsyncSession"
raise TypeError(msg)
t = self._outbox_table
stmt = select(*t.c).order_by(t.c.id)
if queue is not None:
stmt = stmt.where(t.c.queue == queue)
result = await session.execute(stmt)
return [_row_to_message(dict(row)) for row in result.mappings().all()]

async def _notify(self, session: AsyncSession, queue: str) -> None:
"""
Emit ``pg_notify('outbox_<table>', queue)`` so listening subscribers wake immediately.
Expand Down
62 changes: 62 additions & 0 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,65 @@ def _cb(_conn, _pid, _channel, payload) -> None:
await listener.close()

assert received_payloads == ["orders"]


async def test_fetch_unprocessed_returns_all_queues(pg_engine, outbox_table) -> None:
broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
async with session_factory() as session, session.begin():
await broker.publish("o-1", queue="orders", session=session)
await broker.publish("o-2", queue="orders", session=session)
await broker.publish("s-1", queue="shipments", session=session)

async with session_factory() as session:
rows = await broker.fetch_unprocessed(session=session)

assert [r.queue for r in rows] == ["orders", "orders", "shipments"]
assert [r.id for r in rows] == sorted(r.id for r in rows) # ordered by id


async def test_fetch_unprocessed_filters_by_queue(pg_engine, outbox_table) -> None:
broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
async with session_factory() as session, session.begin():
await broker.publish("o-1", queue="orders", session=session)
await broker.publish("s-1", queue="shipments", session=session)

async with session_factory() as session:
orders = await broker.fetch_unprocessed(session=session, queue="orders")

assert len(orders) == 1
assert orders[0].queue == "orders"


async def test_fetch_unprocessed_includes_future_dated_rows(pg_engine, outbox_table) -> None:
"""Future-dated rows (activate_in) are unprocessed too — fetch_unprocessed must surface them."""
broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
async with session_factory() as session, session.begin():
await broker.publish("now", queue="orders", session=session)
await broker.publish(
"later",
queue="orders",
session=session,
activate_in=_dt.timedelta(minutes=5),
)

async with session_factory() as session:
rows = await broker.fetch_unprocessed(session=session, queue="orders")

assert len(rows) == 2
now = _dt.datetime.now(tz=_dt.UTC)
future = [r for r in rows if r.next_attempt_at > now + _dt.timedelta(minutes=1)]
assert len(future) == 1


async def test_fetch_unprocessed_reads_uncommitted_writes_in_same_session(pg_engine, outbox_table) -> None:
"""Same-session contract: a read inside the producer's open transaction sees its own writes."""
broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
async with session_factory() as session, session.begin():
await broker.publish("pre-commit", queue="orders", session=session)
rows = await broker.fetch_unprocessed(session=session)
assert len(rows) == 1
assert rows[0].queue == "orders"
40 changes: 40 additions & 0 deletions tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,46 @@ async def test_broker_cancel_timer_emits_delete_with_lease_guard() -> None:
assert params["timer_id_1"] == "email-1"


async def test_broker_fetch_unprocessed_rejects_non_async_session() -> None:
broker = _make_broker()
with pytest.raises(TypeError, match="AsyncSession"):
await broker.fetch_unprocessed(session=object()) # ty: ignore[invalid-argument-type]


def _fetch_unprocessed_session_mock() -> AsyncMock:
"""AsyncSession mock whose ``execute().mappings().all()`` returns an empty list."""
session = AsyncMock(spec=AsyncSession)
result = MagicMock()
result.mappings.return_value.all.return_value = []
session.execute.return_value = result
return session


async def test_broker_fetch_unprocessed_builds_select_all_columns() -> None:
broker = _make_broker()
session = _fetch_unprocessed_session_mock()
rows = await broker.fetch_unprocessed(session=session)
assert rows == []
stmt = session.execute.await_args_list[0].args[0]
sql = str(stmt)
assert "SELECT" in sql
assert "FROM outbox" in sql
assert "ORDER BY outbox.id" in sql
# No queue filter compiled when queue=None
assert "WHERE" not in sql


async def test_broker_fetch_unprocessed_filters_by_queue() -> None:
broker = _make_broker()
session = _fetch_unprocessed_session_mock()
await broker.fetch_unprocessed(session=session, queue="orders")
stmt = session.execute.await_args_list[0].args[0]
sql = str(stmt)
assert "WHERE outbox.queue =" in sql
params = stmt.compile().params
assert params["queue_1"] == "orders"


async def test_broker_cancel_timer_returns_false_when_nothing_deleted() -> None:
broker = _make_broker()
session = AsyncMock(spec=AsyncSession)
Expand Down
Loading