-
Notifications
You must be signed in to change notification settings - Fork 5
feat(adapters): abstract streaming layer behind LLMProvider (#548) #553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,14 +3,16 @@ | |||||||||||||||||||||||||||||
| Provides Claude model access via the Anthropic API. | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| import asyncio | ||||||||||||||||||||||||||||||
| import os | ||||||||||||||||||||||||||||||
| from typing import TYPE_CHECKING, Iterator, Optional | ||||||||||||||||||||||||||||||
| from typing import TYPE_CHECKING, AsyncIterator, Iterator, Optional | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| from codeframe.adapters.llm.base import ( | ||||||||||||||||||||||||||||||
| LLMProvider, | ||||||||||||||||||||||||||||||
| LLMResponse, | ||||||||||||||||||||||||||||||
| ModelSelector, | ||||||||||||||||||||||||||||||
| Purpose, | ||||||||||||||||||||||||||||||
| StreamChunk, | ||||||||||||||||||||||||||||||
| Tool, | ||||||||||||||||||||||||||||||
| ToolCall, | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
|
@@ -172,6 +174,117 @@ async def async_complete( | |||||||||||||||||||||||||||||
| except APIConnectionError as exc: | ||||||||||||||||||||||||||||||
| raise LLMConnectionError(str(exc)) from exc | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| def supports(self, capability: str) -> bool: | ||||||||||||||||||||||||||||||
| """Return True for capabilities this provider supports.""" | ||||||||||||||||||||||||||||||
| return capability == "extended_thinking" | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| async def async_stream( | ||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||
| messages: list[dict], | ||||||||||||||||||||||||||||||
| system: str, | ||||||||||||||||||||||||||||||
| tools: list[dict], | ||||||||||||||||||||||||||||||
| model: str, | ||||||||||||||||||||||||||||||
| max_tokens: int, | ||||||||||||||||||||||||||||||
| interrupt_event: Optional[asyncio.Event] = None, | ||||||||||||||||||||||||||||||
| extended_thinking: bool = False, | ||||||||||||||||||||||||||||||
| ) -> AsyncIterator[StreamChunk]: | ||||||||||||||||||||||||||||||
| """Stream using Anthropic AsyncAnthropic SDK, yielding StreamChunk objects. | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| Translates Anthropic SDK events into the normalized StreamChunk format. | ||||||||||||||||||||||||||||||
| Tool inputs are collected and emitted in the final message_stop chunk | ||||||||||||||||||||||||||||||
| via tool_inputs_by_id, which is more reliable than streaming input deltas. | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| When ``extended_thinking=True``, requests interleaved thinking via the | ||||||||||||||||||||||||||||||
| Anthropic betas API. The flag is silently ignored on SDK versions that | ||||||||||||||||||||||||||||||
| do not support it. | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
| from anthropic import AsyncAnthropic | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if self._async_client is None: | ||||||||||||||||||||||||||||||
| self._async_client = AsyncAnthropic(api_key=self.api_key) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Convert messages to Anthropic API format (handles tool_calls/tool_results) | ||||||||||||||||||||||||||||||
| converted = self._convert_messages(messages) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| kwargs: dict = { | ||||||||||||||||||||||||||||||
| "model": model, | ||||||||||||||||||||||||||||||
| "system": system, | ||||||||||||||||||||||||||||||
| "messages": converted, | ||||||||||||||||||||||||||||||
| "tools": tools, | ||||||||||||||||||||||||||||||
| "max_tokens": max_tokens, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
Comment on lines
+209
to
+215
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Convert streamed messages the same way as completions.
🐛 Proposed fix kwargs: dict = {
"model": model,
"system": system,
- "messages": messages,
+ "messages": self._convert_messages(messages),
"tools": tools,
"max_tokens": max_tokens,
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if extended_thinking: | ||||||||||||||||||||||||||||||
| kwargs["betas"] = ["interleaved-thinking-2025-05-14"] | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| active_tool_id: Optional[str] = None | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # When extended_thinking is set, the beta header may be unsupported on | ||||||||||||||||||||||||||||||
| # older SDK versions. Retry without it rather than hard-failing. | ||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||
| stream_ctx = self._async_client.messages.stream(**kwargs) | ||||||||||||||||||||||||||||||
| except Exception: # pragma: no cover | ||||||||||||||||||||||||||||||
| if extended_thinking: | ||||||||||||||||||||||||||||||
| kwargs.pop("betas", None) | ||||||||||||||||||||||||||||||
| stream_ctx = self._async_client.messages.stream(**kwargs) | ||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| async with stream_ctx as stream: | ||||||||||||||||||||||||||||||
| async for sdk_event in stream: | ||||||||||||||||||||||||||||||
| if interrupt_event and interrupt_event.is_set(): | ||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| event_type = sdk_event.type | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if event_type == "content_block_start": | ||||||||||||||||||||||||||||||
| block = sdk_event.content_block | ||||||||||||||||||||||||||||||
| if block.type == "tool_use": | ||||||||||||||||||||||||||||||
| active_tool_id = block.id | ||||||||||||||||||||||||||||||
| yield StreamChunk( | ||||||||||||||||||||||||||||||
| type="tool_use_start", | ||||||||||||||||||||||||||||||
| tool_id=block.id, | ||||||||||||||||||||||||||||||
| tool_name=block.name, | ||||||||||||||||||||||||||||||
| tool_input=getattr(block, "input", {}), | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| elif event_type == "content_block_delta": | ||||||||||||||||||||||||||||||
| delta = sdk_event.delta | ||||||||||||||||||||||||||||||
| if delta.type == "text_delta": | ||||||||||||||||||||||||||||||
| yield StreamChunk(type="text_delta", text=delta.text) | ||||||||||||||||||||||||||||||
| elif delta.type == "thinking_delta": | ||||||||||||||||||||||||||||||
| yield StreamChunk(type="thinking_delta", text=delta.thinking) | ||||||||||||||||||||||||||||||
| # input_json_delta: final inputs are rebuilt from message_stop | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| elif event_type == "content_block_stop": | ||||||||||||||||||||||||||||||
| if active_tool_id is not None: | ||||||||||||||||||||||||||||||
| yield StreamChunk(type="tool_use_stop") | ||||||||||||||||||||||||||||||
| active_tool_id = None | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| elif event_type == "message_stop": | ||||||||||||||||||||||||||||||
| # Flush any open tool block | ||||||||||||||||||||||||||||||
| if active_tool_id is not None: | ||||||||||||||||||||||||||||||
| yield StreamChunk(type="tool_use_stop") | ||||||||||||||||||||||||||||||
| active_tool_id = None | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| final_msg = await stream.get_final_message() | ||||||||||||||||||||||||||||||
| stop_reason = final_msg.stop_reason or "end_turn" | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Build tool_inputs_by_id from final content blocks | ||||||||||||||||||||||||||||||
| tool_inputs_by_id: dict = {} | ||||||||||||||||||||||||||||||
| if hasattr(final_msg, "content"): | ||||||||||||||||||||||||||||||
| for block in final_msg.content: | ||||||||||||||||||||||||||||||
| if getattr(block, "type", None) == "tool_use" and hasattr(block, "id"): | ||||||||||||||||||||||||||||||
| tool_inputs_by_id[block.id] = getattr(block, "input", {}) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| yield StreamChunk( | ||||||||||||||||||||||||||||||
| type="message_stop", | ||||||||||||||||||||||||||||||
| stop_reason=stop_reason, | ||||||||||||||||||||||||||||||
| input_tokens=final_msg.usage.input_tokens, | ||||||||||||||||||||||||||||||
| output_tokens=final_msg.usage.output_tokens, | ||||||||||||||||||||||||||||||
| tool_inputs_by_id=tool_inputs_by_id, | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| def stream( | ||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||
| messages: list[dict], | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,13 +4,15 @@ | |
| Supports configurable responses and call tracking. | ||
| """ | ||
|
|
||
| from typing import Callable, Iterator, Optional | ||
| import asyncio | ||
| from typing import AsyncIterator, Callable, Iterator, Optional | ||
|
|
||
| from codeframe.adapters.llm.base import ( | ||
| LLMProvider, | ||
| LLMResponse, | ||
| ModelSelector, | ||
| Purpose, | ||
| StreamChunk, | ||
| Tool, | ||
| ToolCall, | ||
| ) | ||
|
|
@@ -40,6 +42,8 @@ def __init__( | |
| self.responses: list[LLMResponse] = [] | ||
| self.response_index = 0 | ||
| self.response_handler: Optional[Callable[[list[dict]], LLMResponse]] = None | ||
| self.stream_chunks: list[list[StreamChunk]] = [] | ||
| self.stream_index = 0 | ||
|
|
||
| def add_response(self, response: LLMResponse) -> None: | ||
| """Add a canned response to the queue. | ||
|
|
@@ -175,12 +179,83 @@ def stream( | |
| for word in response.content.split(): | ||
| yield word + " " | ||
|
|
||
| def add_stream_chunks(self, chunks: list[StreamChunk]) -> None: | ||
| """Add a sequence of StreamChunks for the next async_stream() call. | ||
|
|
||
| Args: | ||
| chunks: Ordered list of StreamChunk objects to yield. | ||
| """ | ||
| self.stream_chunks.append(chunks) | ||
|
|
||
| async def async_stream( | ||
| self, | ||
| messages: list[dict], | ||
| system: str, | ||
| tools: list[dict], | ||
| model: str, | ||
| max_tokens: int, | ||
| interrupt_event: Optional[asyncio.Event] = None, | ||
| extended_thinking: bool = False, | ||
| ) -> AsyncIterator[StreamChunk]: | ||
| """Yield pre-configured StreamChunk sequences for testing. | ||
|
|
||
| Tracks each call in :attr:`calls` (same metadata as :meth:`complete`). | ||
| When pre-configured ``stream_chunks`` are available, yields them in | ||
| order. Otherwise falls back to a minimal ``text_delta`` + | ||
| ``message_stop`` pair derived from the normal response queue | ||
| (``responses`` / ``response_handler`` / ``default_response``). | ||
| """ | ||
| # Track the call so tests can assert on it | ||
| self.calls.append( | ||
| { | ||
| "messages": messages, | ||
| "system": system, | ||
| "tools": tools, | ||
| "model": model, | ||
| "max_tokens": max_tokens, | ||
| "extended_thinking": extended_thinking, | ||
| } | ||
| ) | ||
|
|
||
| if self.stream_index < len(self.stream_chunks): | ||
| chunks = self.stream_chunks[self.stream_index] | ||
| self.stream_index += 1 | ||
| else: | ||
| # Derive response text from the normal queue / handler | ||
| if self.response_handler: | ||
| resp = self.response_handler(messages) | ||
| text = resp.content | ||
| elif self.response_index < len(self.responses): | ||
| resp = self.responses[self.response_index] | ||
| self.response_index += 1 | ||
| text = resp.content | ||
| else: | ||
| text = self.default_response | ||
|
|
||
| chunks = [ | ||
| StreamChunk(type="text_delta", text=text), | ||
| StreamChunk( | ||
| type="message_stop", | ||
| stop_reason="end_turn", | ||
| input_tokens=len(str(messages)), | ||
| output_tokens=len(text), | ||
| tool_inputs_by_id={}, | ||
| ), | ||
| ] | ||
|
|
||
| for chunk in chunks: | ||
| if interrupt_event and interrupt_event.is_set(): | ||
| return | ||
| yield chunk | ||
|
Comment on lines
+190
to
+249
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep When no explicit 🤖 Prompt for AI Agents |
||
|
|
||
| def reset(self) -> None: | ||
| """Reset call tracking and response queue.""" | ||
| self.calls.clear() | ||
| self.responses.clear() | ||
| self.response_index = 0 | ||
| self.response_handler = None | ||
| self.stream_chunks.clear() | ||
| self.stream_index = 0 | ||
|
|
||
| @property | ||
| def call_count(self) -> int: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 2756
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 4957
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 1156
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 2665
🏁 Script executed:
cat -n codeframe/adapters/llm/anthropic.py | head -100Repository: frankbria/codeframe
Length of output: 3760
Move the try-except wrapping to the actual API call, not the dict assignment.
The try-except at lines 217–220 wraps only
kwargs["betas"] = ..., a dict assignment that cannot fail. If Anthropic's API rejects the unsupported-beta header, the error occurs at line 224 duringmessages.stream(**kwargs), which is outside the try-except. This means whenextended_thinking=Trueis used but the API doesn't support the beta feature, an unhandled exception will propagate instead of silently degrading as the docstring claims and assupports()implies.Wrap the actual
messages.stream()call with the try-except to catch unsupported-beta errors from the API.🤖 Prompt for AI Agents