diff --git a/faststream_outbox/broker.py b/faststream_outbox/broker.py
index 2100466..f75af73 100644
--- a/faststream_outbox/broker.py
+++ b/faststream_outbox/broker.py
@@ -23,11 +23,11 @@
from faststream._internal.types import BrokerMiddleware, CustomCallable
from faststream.specification.schema import BrokerSpec
from faststream.specification.schema.extra import Tag, TagDict
-from sqlalchemy import Float, bindparam, delete, func, insert, text
+from sqlalchemy import Float, bindparam, delete, func, insert, select, text
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
-from faststream_outbox.client import OutboxClient
+from faststream_outbox.client import OutboxClient, _row_to_message
from faststream_outbox.configs import EngineState, OutboxBrokerConfig
from faststream_outbox.envelope import _encode_payload
from faststream_outbox.message import OutboxInnerMessage
@@ -329,6 +329,31 @@ async def cancel_timer(
result = await session.execute(stmt)
return (result.rowcount or 0) > 0 # ty: ignore[unresolved-attribute]
+ async def fetch_unprocessed(
+ self,
+ *,
+ session: AsyncSession,
+ queue: str | None = None,
+ ) -> list[OutboxInnerMessage]:
+ """
+ Return outbox rows currently in the table — pending, in-flight, or future-dated.
+
+ Intended for test assertions: a successful delivery deletes the row, so anything
+ still in the table is "unprocessed". Pass *queue* to filter to a single queue;
+ omit it to return rows across all queues. Runs on the caller's session (same
+ transactional contract as :meth:`publish`); does not acquire a lease and does
+ not mutate row state, so it is safe to call alongside running subscribers.
+ """
+ if not isinstance(session, AsyncSession):
+ msg = "broker.fetch_unprocessed requires an sqlalchemy.ext.asyncio.AsyncSession"
+ raise TypeError(msg)
+ t = self._outbox_table
+ stmt = select(*t.c).order_by(t.c.id)
+ if queue is not None:
+ stmt = stmt.where(t.c.queue == queue)
+ result = await session.execute(stmt)
+ return [_row_to_message(dict(row)) for row in result.mappings().all()]
+
async def _notify(self, session: AsyncSession, queue: str) -> None:
"""
Emit ``pg_notify('outbox_
', queue)`` so listening subscribers wake immediately.
diff --git a/tests/test_integration.py b/tests/test_integration.py
index e2dbdfd..1bcc8b7 100644
--- a/tests/test_integration.py
+++ b/tests/test_integration.py
@@ -595,3 +595,65 @@ def _cb(_conn, _pid, _channel, payload) -> None:
await listener.close()
assert received_payloads == ["orders"]
+
+
+async def test_fetch_unprocessed_returns_all_queues(pg_engine, outbox_table) -> None:
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with session_factory() as session, session.begin():
+ await broker.publish("o-1", queue="orders", session=session)
+ await broker.publish("o-2", queue="orders", session=session)
+ await broker.publish("s-1", queue="shipments", session=session)
+
+ async with session_factory() as session:
+ rows = await broker.fetch_unprocessed(session=session)
+
+ assert [r.queue for r in rows] == ["orders", "orders", "shipments"]
+ assert [r.id for r in rows] == sorted(r.id for r in rows) # ordered by id
+
+
+async def test_fetch_unprocessed_filters_by_queue(pg_engine, outbox_table) -> None:
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with session_factory() as session, session.begin():
+ await broker.publish("o-1", queue="orders", session=session)
+ await broker.publish("s-1", queue="shipments", session=session)
+
+ async with session_factory() as session:
+ orders = await broker.fetch_unprocessed(session=session, queue="orders")
+
+ assert len(orders) == 1
+ assert orders[0].queue == "orders"
+
+
+async def test_fetch_unprocessed_includes_future_dated_rows(pg_engine, outbox_table) -> None:
+ """Future-dated rows (activate_in) are unprocessed too — fetch_unprocessed must surface them."""
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with session_factory() as session, session.begin():
+ await broker.publish("now", queue="orders", session=session)
+ await broker.publish(
+ "later",
+ queue="orders",
+ session=session,
+ activate_in=_dt.timedelta(minutes=5),
+ )
+
+ async with session_factory() as session:
+ rows = await broker.fetch_unprocessed(session=session, queue="orders")
+
+ assert len(rows) == 2
+ now = _dt.datetime.now(tz=_dt.UTC)
+ future = [r for r in rows if r.next_attempt_at > now + _dt.timedelta(minutes=1)]
+ assert len(future) == 1
+
+
+async def test_fetch_unprocessed_reads_uncommitted_writes_in_same_session(pg_engine, outbox_table) -> None:
+ """Same-session contract: a read inside the producer's open transaction sees its own writes."""
+ broker = OutboxBroker(pg_engine, outbox_table=outbox_table)
+ session_factory = async_sessionmaker(pg_engine, expire_on_commit=False)
+ async with session_factory() as session, session.begin():
+ await broker.publish("pre-commit", queue="orders", session=session)
+ rows = await broker.fetch_unprocessed(session=session)
+ assert len(rows) == 1
+ assert rows[0].queue == "orders"
diff --git a/tests/test_unit.py b/tests/test_unit.py
index f45b56a..2f7a1af 100644
--- a/tests/test_unit.py
+++ b/tests/test_unit.py
@@ -527,6 +527,46 @@ async def test_broker_cancel_timer_emits_delete_with_lease_guard() -> None:
assert params["timer_id_1"] == "email-1"
+async def test_broker_fetch_unprocessed_rejects_non_async_session() -> None:
+ broker = _make_broker()
+ with pytest.raises(TypeError, match="AsyncSession"):
+ await broker.fetch_unprocessed(session=object()) # ty: ignore[invalid-argument-type]
+
+
+def _fetch_unprocessed_session_mock() -> AsyncMock:
+ """AsyncSession mock whose ``execute().mappings().all()`` returns an empty list."""
+ session = AsyncMock(spec=AsyncSession)
+ result = MagicMock()
+ result.mappings.return_value.all.return_value = []
+ session.execute.return_value = result
+ return session
+
+
+async def test_broker_fetch_unprocessed_builds_select_all_columns() -> None:
+ broker = _make_broker()
+ session = _fetch_unprocessed_session_mock()
+ rows = await broker.fetch_unprocessed(session=session)
+ assert rows == []
+ stmt = session.execute.await_args_list[0].args[0]
+ sql = str(stmt)
+ assert "SELECT" in sql
+ assert "FROM outbox" in sql
+ assert "ORDER BY outbox.id" in sql
+ # No queue filter compiled when queue=None
+ assert "WHERE" not in sql
+
+
+async def test_broker_fetch_unprocessed_filters_by_queue() -> None:
+ broker = _make_broker()
+ session = _fetch_unprocessed_session_mock()
+ await broker.fetch_unprocessed(session=session, queue="orders")
+ stmt = session.execute.await_args_list[0].args[0]
+ sql = str(stmt)
+ assert "WHERE outbox.queue =" in sql
+ params = stmt.compile().params
+ assert params["queue_1"] == "orders"
+
+
async def test_broker_cancel_timer_returns_false_when_nothing_deleted() -> None:
broker = _make_broker()
session = AsyncMock(spec=AsyncSession)