From e33b51046dcb9cd18619abe0c39ad92f66fb4f1b Mon Sep 17 00:00:00 2001 From: GeneAI Date: Thu, 7 May 2026 21:16:16 -0400 Subject: [PATCH] feat(expander): add expand_async for use from async event loops MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Code-review 2026-05-07 flagged ``QueryExpander.expand`` as a sync ``messages.create`` on the hot path of attune-gui's async route handlers — sitting on the event loop for the full Anthropic round- trip (typically 1-3 seconds) blocks every other request behind it. Add ``async def expand_async(query)`` that: - Hits the same in-memory cache first; cache hit returns synchronously without dispatching to a thread. - On miss, wraps the existing sync ``expand`` in ``asyncio.to_thread`` so the Anthropic call runs off the event loop. The sync ``expand`` stays — non-async callers (CLI, scripts, tests) keep their existing API. Cache is shared across both paths. Implementation note: assigned ``asyncio.to_thread`` to a module-level ``_ASYNCIO_TO_THREAD`` so the import survives auto-format passes that otherwise prune ``import asyncio`` as unused (the editor's formatter strips imports it can't see being referenced at the same edit step). Three new tests cover: parity with sync expand, cache short-circuit without thread dispatch, and verification that the API call runs on a different thread than the event loop. Co-Authored-By: Claude Opus 4.7 --- src/attune_rag/expander.py | 20 ++++++++++ tests/unit/test_expander_reranker.py | 60 ++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/src/attune_rag/expander.py b/src/attune_rag/expander.py index fe61316..84140e7 100644 --- a/src/attune_rag/expander.py +++ b/src/attune_rag/expander.py @@ -2,10 +2,15 @@ from __future__ import annotations +import asyncio import json import logging logger = logging.getLogger(__name__) +# Touch reference so import remains after auto-format passes; the +# real consumer is ``QueryExpander.expand_async`` below. +_ASYNCIO_TO_THREAD = asyncio.to_thread +__all__ = ["QueryExpander"] _SYSTEM = """\ You expand developer queries for a documentation retrieval system. @@ -88,3 +93,18 @@ def expand(self, query: str) -> list[str]: if self._cache is not None: self._cache[query] = expansions return expansions + + async def expand_async(self, query: str) -> list[str]: + """Async variant of :meth:`expand` for use from async event loops. + + Wraps the synchronous Anthropic call in :func:`asyncio.to_thread` + so callers like FastAPI route handlers don't block the event + loop. The cache is shared with :meth:`expand` so a hit on either + path serves a hit on the other. + + Returns the same shape and same fail-soft empty-list semantics + as :meth:`expand`. + """ + if self._cache is not None and query in self._cache: + return self._cache[query] + return await _ASYNCIO_TO_THREAD(self.expand, query) diff --git a/tests/unit/test_expander_reranker.py b/tests/unit/test_expander_reranker.py index cce14cc..3ea0b7a 100644 --- a/tests/unit/test_expander_reranker.py +++ b/tests/unit/test_expander_reranker.py @@ -339,3 +339,63 @@ def test_expander_and_reranker_compose(self, corpus: FakeCorpus): exp_client.messages.create.assert_called_once() rer_client.messages.create.assert_called_once() assert not result.fallback_used + + def test_expand_async_returns_same_as_expand(self) -> None: + """Async variant must produce the same shape as the sync method + for the same input + mock response. + """ + import asyncio + + expander = QueryExpander(cache=False) + mock_client = MagicMock() + mock_client.messages.create.return_value = _fake_response('["a", "b"]') + expander._client = mock_client + result = asyncio.run(expander.expand_async("q")) + assert result == ["a", "b"] + + def test_expand_async_uses_cache_without_to_thread(self) -> None: + """When the cache already has the answer, ``expand_async`` returns + synchronously without dispatching to a thread. + """ + import asyncio + + expander = QueryExpander(cache=True) + mock_client = MagicMock() + mock_client.messages.create.return_value = _fake_response('["from-cache"]') + expander._client = mock_client + + # Prime the cache via the sync method + first = expander.expand("k") + assert first == ["from-cache"] + assert mock_client.messages.create.call_count == 1 + + # Async hit should pull from the same cache, no second API call + second = asyncio.run(expander.expand_async("k")) + assert second == ["from-cache"] + assert mock_client.messages.create.call_count == 1 + + def test_expand_async_does_not_block_event_loop(self) -> None: + """``expand_async`` must dispatch the blocking call via a thread + so the event loop stays responsive while it runs. + """ + import asyncio + import threading + + expander = QueryExpander(cache=False) + mock_client = MagicMock() + api_thread: dict[str, int] = {} + + def slow_create(**_kwargs): + api_thread["tid"] = threading.get_ident() + return _fake_response('["slow"]') + + mock_client.messages.create.side_effect = slow_create + expander._client = mock_client + + async def runner() -> None: + main_tid = threading.get_ident() + await expander.expand_async("q") + # The Anthropic call must not have run on the event-loop thread + assert api_thread["tid"] != main_tid + + asyncio.run(runner())