From 3b828f6e12fa68835c6db8fe18fa855476770a0c Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Thu, 21 May 2026 12:58:03 +0300 Subject: [PATCH] fix: translate multi-host URLs via SQLAlchemy dialect for LISTEN MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit asyncpg.connect(dsn) failed with TargetServerAttributeNotMatched on URLs of the form ?host=h1:5432&host=h2:5432 because render_as_string URL-encodes the host:port pair into a single token asyncpg can't parse. Delegate URL → kwargs translation to engine.dialect.create_connect_args so multi-host URLs reach asyncpg as host=[...], port=[...] (same approach as the org's db-retry). Co-Authored-By: Claude Opus 4.7 (1M context) --- faststream_outbox/subscriber/usecase.py | 13 ++++--- tests/test_unit.py | 48 ++++++++++++++++++++++++- 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/faststream_outbox/subscriber/usecase.py b/faststream_outbox/subscriber/usecase.py index 082c3fa..7b08ec1 100644 --- a/faststream_outbox/subscriber/usecase.py +++ b/faststream_outbox/subscriber/usecase.py @@ -236,12 +236,15 @@ async def _open_listen_connection(self, engine: "AsyncEngine") -> "_asyncpg.Conn """ if _asyncpg is None or "asyncpg" not in (engine.url.drivername or ""): return None - # SQLAlchemy URL with the +asyncpg suffix isn't a valid raw asyncpg DSN; strip it. - # ``str(url)`` hides the password — use ``render_as_string(hide_password=False)`` - # so asyncpg.connect actually sees the credentials. - dsn = engine.url.set(drivername="postgresql").render_as_string(hide_password=False) + # Delegate URL → asyncpg kwargs translation to SQLAlchemy's dialect so multi-host + # URLs (``?host=h1:5432&host=h2:5432``) become ``host=[...], port=[...]``. Round- + # tripping through ``render_as_string`` URL-encodes ``host:port`` into one token + # asyncpg can't parse and fails for failover/replica clusters. + _, opts = engine.dialect.create_connect_args(engine.url) + for sa_only_key in ("prepared_statement_cache_size", "async_fallback", "async_creator_fn"): + opts.pop(sa_only_key, None) try: - conn = await _asyncpg.connect(dsn) + conn = await _asyncpg.connect(**opts) await conn.add_listener(self._notify_channel, self._on_notify) except Exception as e: # noqa: BLE001 self._log( diff --git a/tests/test_unit.py b/tests/test_unit.py index 37dd31a..7a888e0 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -969,7 +969,10 @@ async def test_open_listen_connection_returns_none_when_asyncpg_connect_fails() sub = _make_subscriber_for_listener_test() engine = MagicMock() engine.url.drivername = "postgresql+asyncpg" - engine.url.set.return_value.render_as_string.return_value = "postgresql://u:p@h/db" + engine.dialect.create_connect_args.return_value = ( + [], + {"host": "h", "user": "u", "password": "p", "database": "db"}, + ) with ( patch.object(sub, "_log") as log_mock, @@ -983,3 +986,46 @@ async def test_open_listen_connection_returns_none_when_asyncpg_connect_fails() assert result is None log_mock.assert_called_once() assert "LISTEN setup failed" in log_mock.call_args.kwargs["message"] + + +async def test_open_listen_connection_passes_multihost_kwargs_to_asyncpg() -> None: + """ + Multi-host URLs must reach asyncpg as host/port lists, not a re-rendered DSN. + + ``?host=h1:5432&host=h2:5432`` renders back as URL-encoded host tokens asyncpg + can't parse. SQLAlchemy-only kwargs (``prepared_statement_cache_size``, ...) must + be stripped before ``asyncpg.connect``, which rejects unknown kwargs. + """ + sub = _make_subscriber_for_listener_test() + engine = MagicMock() + engine.url.drivername = "postgresql+asyncpg" + engine.dialect.create_connect_args.return_value = ( + [], + { + "host": ["h1", "h2"], + "port": [5432, 5432], + "user": "u", + "password": "p", + "database": "db", + "prepared_statement_cache_size": 100, + }, + ) + + fake_conn = MagicMock() + fake_conn.add_listener = AsyncMock() + connect_mock = AsyncMock(return_value=fake_conn) + + with ( + patch("faststream_outbox.subscriber.usecase._asyncpg.connect", new=connect_mock), + patch.object(OutboxSubscriber, "_notify_channel", new="outbox_orders"), + ): + result = await sub._open_listen_connection(engine) # noqa: SLF001 + + assert result is fake_conn + connect_mock.assert_awaited_once() + assert connect_mock.await_args is not None + kwargs = connect_mock.await_args.kwargs + assert kwargs["host"] == ["h1", "h2"] + assert kwargs["port"] == [5432, 5432] + assert "prepared_statement_cache_size" not in kwargs + fake_conn.add_listener.assert_awaited_once()