Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codeframe/adapters/llm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Message,
ModelSelector,
Purpose,
StreamChunk,
Tool,
ToolCall,
ToolResult,
Expand All @@ -40,6 +41,7 @@
"Message",
"ModelSelector",
"Purpose",
"StreamChunk",
"Tool",
"ToolCall",
"ToolResult",
Expand Down
115 changes: 114 additions & 1 deletion codeframe/adapters/llm/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
Provides Claude model access via the Anthropic API.
"""

import asyncio
import os
from typing import TYPE_CHECKING, Iterator, Optional
from typing import TYPE_CHECKING, AsyncIterator, Iterator, Optional

from codeframe.adapters.llm.base import (
LLMProvider,
LLMResponse,
ModelSelector,
Purpose,
StreamChunk,
Tool,
ToolCall,
)
Expand Down Expand Up @@ -172,6 +174,117 @@ async def async_complete(
except APIConnectionError as exc:
raise LLMConnectionError(str(exc)) from exc

def supports(self, capability: str) -> bool:
"""Return True for capabilities this provider supports."""
return capability == "extended_thinking"
Comment on lines +177 to +179
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n codeframe/adapters/llm/anthropic.py | sed -n '170,230p'

Repository: frankbria/codeframe

Length of output: 2756


🏁 Script executed:

cat -n codeframe/adapters/llm/anthropic.py | sed -n '100,220p'

Repository: frankbria/codeframe

Length of output: 4957


🏁 Script executed:

cat -n codeframe/adapters/llm/anthropic.py | grep -A 30 "def stream"

Repository: frankbria/codeframe

Length of output: 1156


🏁 Script executed:

cat -n codeframe/adapters/llm/anthropic.py | sed -n '279,340p'

Repository: frankbria/codeframe

Length of output: 2665


🏁 Script executed:

cat -n codeframe/adapters/llm/anthropic.py | head -100

Repository: frankbria/codeframe

Length of output: 3760


Move the try-except wrapping to the actual API call, not the dict assignment.

The try-except at lines 217–220 wraps only kwargs["betas"] = ..., a dict assignment that cannot fail. If Anthropic's API rejects the unsupported-beta header, the error occurs at line 224 during messages.stream(**kwargs), which is outside the try-except. This means when extended_thinking=True is used but the API doesn't support the beta feature, an unhandled exception will propagate instead of silently degrading as the docstring claims and as supports() implies.

Wrap the actual messages.stream() call with the try-except to catch unsupported-beta errors from the API.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/adapters/llm/anthropic.py` around lines 177 - 179, The try-except
currently wraps only the dict assignment kwargs["betas"] = ..., but the API
error will happen on the call to messages.stream(**kwargs); move the try-except
so it encloses messages.stream(**kwargs) instead of the dict assignment.
Specifically, keep setting kwargs["betas"] when extended_thinking is true, but
call messages.stream(**kwargs) inside a try block and catch the API error that
indicates unsupported betas (or a generic request/HTTP exception), then degrade
by removing or ignoring kwargs["betas"] and retrying or falling back to non-beta
streaming so extended_thinking silently degrades as supports() promises. Ensure
you reference the same variables and methods: kwargs, "betas", messages.stream,
and the extended_thinking handling.


async def async_stream(
self,
messages: list[dict],
system: str,
tools: list[dict],
model: str,
max_tokens: int,
interrupt_event: Optional[asyncio.Event] = None,
extended_thinking: bool = False,
) -> AsyncIterator[StreamChunk]:
"""Stream using Anthropic AsyncAnthropic SDK, yielding StreamChunk objects.

Translates Anthropic SDK events into the normalized StreamChunk format.
Tool inputs are collected and emitted in the final message_stop chunk
via tool_inputs_by_id, which is more reliable than streaming input deltas.

When ``extended_thinking=True``, requests interleaved thinking via the
Anthropic betas API. The flag is silently ignored on SDK versions that
do not support it.
"""
from anthropic import AsyncAnthropic

if self._async_client is None:
self._async_client = AsyncAnthropic(api_key=self.api_key)

# Convert messages to Anthropic API format (handles tool_calls/tool_results)
converted = self._convert_messages(messages)

kwargs: dict = {
"model": model,
"system": system,
"messages": converted,
"tools": tools,
"max_tokens": max_tokens,
}
Comment on lines +209 to +215
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Convert streamed messages the same way as completions.

complete() and async_complete() both normalize through _convert_messages(messages), but async_stream() forwards the raw dicts. The same tool_calls / tool_results payload that works in completion mode can therefore fail once callers switch to streaming.

🐛 Proposed fix
         kwargs: dict = {
             "model": model,
             "system": system,
-            "messages": messages,
+            "messages": self._convert_messages(messages),
             "tools": tools,
             "max_tokens": max_tokens,
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
kwargs: dict = {
"model": model,
"system": system,
"messages": messages,
"tools": tools,
"max_tokens": max_tokens,
}
kwargs: dict = {
"model": model,
"system": system,
"messages": self._convert_messages(messages),
"tools": tools,
"max_tokens": max_tokens,
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/adapters/llm/anthropic.py` around lines 206 - 212, async_stream is
forwarding raw message dicts while complete() and async_complete() use
_convert_messages(messages); update async_stream to normalize messages by
calling _convert_messages(messages) before building/forwarding kwargs so the
same tool_calls/tool_results payloads work for streaming; locate async_stream
and replace or augment the messages usage with converted =
self._convert_messages(messages) (or the equivalent method name) and pass that
converted variable into kwargs instead of the original messages.


if extended_thinking:
kwargs["betas"] = ["interleaved-thinking-2025-05-14"]

active_tool_id: Optional[str] = None

# When extended_thinking is set, the beta header may be unsupported on
# older SDK versions. Retry without it rather than hard-failing.
try:
stream_ctx = self._async_client.messages.stream(**kwargs)
except Exception: # pragma: no cover
if extended_thinking:
kwargs.pop("betas", None)
stream_ctx = self._async_client.messages.stream(**kwargs)
else:
raise

async with stream_ctx as stream:
async for sdk_event in stream:
if interrupt_event and interrupt_event.is_set():
return

event_type = sdk_event.type

if event_type == "content_block_start":
block = sdk_event.content_block
if block.type == "tool_use":
active_tool_id = block.id
yield StreamChunk(
type="tool_use_start",
tool_id=block.id,
tool_name=block.name,
tool_input=getattr(block, "input", {}),
)

elif event_type == "content_block_delta":
delta = sdk_event.delta
if delta.type == "text_delta":
yield StreamChunk(type="text_delta", text=delta.text)
elif delta.type == "thinking_delta":
yield StreamChunk(type="thinking_delta", text=delta.thinking)
# input_json_delta: final inputs are rebuilt from message_stop

elif event_type == "content_block_stop":
if active_tool_id is not None:
yield StreamChunk(type="tool_use_stop")
active_tool_id = None

elif event_type == "message_stop":
# Flush any open tool block
if active_tool_id is not None:
yield StreamChunk(type="tool_use_stop")
active_tool_id = None

final_msg = await stream.get_final_message()
stop_reason = final_msg.stop_reason or "end_turn"

# Build tool_inputs_by_id from final content blocks
tool_inputs_by_id: dict = {}
if hasattr(final_msg, "content"):
for block in final_msg.content:
if getattr(block, "type", None) == "tool_use" and hasattr(block, "id"):
tool_inputs_by_id[block.id] = getattr(block, "input", {})

yield StreamChunk(
type="message_stop",
stop_reason=stop_reason,
input_tokens=final_msg.usage.input_tokens,
output_tokens=final_msg.usage.output_tokens,
tool_inputs_by_id=tool_inputs_by_id,
)

def stream(
self,
messages: list[dict],
Expand Down
102 changes: 101 additions & 1 deletion codeframe/adapters/llm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Iterator, Optional
from typing import AsyncIterator, Iterator, Optional


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -120,6 +120,53 @@ def for_purpose(self, purpose: Purpose) -> str:
return self.execution_model # Default fallback


@dataclass
class StreamChunk:
"""A normalized chunk from a streaming LLM response.

Provider-specific streaming formats are translated into this common type
by each :class:`LLMProvider` implementation.

Attributes:
type: Event type — one of ``"text_delta"``, ``"thinking_delta"``,
``"tool_use_start"``, ``"tool_use_stop"``, ``"message_stop"``.
text: Text content for ``text_delta`` and ``thinking_delta`` types.
tool_id: Tool call ID for ``tool_use_start``.
tool_name: Tool name for ``tool_use_start``.
tool_input: Tool input dict for ``tool_use_start`` (may be empty;
final inputs are provided in the ``message_stop`` chunk).
input_tokens: Input token count, populated for ``message_stop``.
output_tokens: Output token count, populated for ``message_stop``.
stop_reason: Why the model stopped, populated for ``message_stop``.
tool_inputs_by_id: Mapping of tool_id → final input dict, populated
for ``message_stop``. More reliable than streaming incremental
input deltas.

.. note:: ``tool_use_stop`` ordering differs by provider:

- **Anthropic**: emitted immediately when each tool call's content
block ends (``content_block_stop`` event), so consumers see
``tool_use_start → [deltas] → tool_use_stop`` interleaved.
- **OpenAI-compatible**: emitted after the full stream ends (before
``message_stop``), because the SSE protocol has no per-tool stop
marker. All ``tool_use_stop`` chunks arrive together at the end.

Consumers MUST use ``tool_inputs_by_id`` from the ``message_stop``
chunk for final tool inputs rather than relying on ``tool_use_stop``
ordering.
"""

type: str
text: Optional[str] = None
tool_id: Optional[str] = None
tool_name: Optional[str] = None
tool_input: Optional[dict] = None
input_tokens: Optional[int] = None
output_tokens: Optional[int] = None
stop_reason: Optional[str] = None
tool_inputs_by_id: Optional[dict] = None


@dataclass
class ToolCall:
"""Represents a tool call requested by the LLM.
Expand Down Expand Up @@ -332,6 +379,59 @@ async def async_complete(
lambda: self.complete(messages, purpose, tools, max_tokens, temperature, system),
)

def supports(self, capability: str) -> bool:
"""Check whether this provider supports an optional capability.

Args:
capability: Capability name, e.g. ``"extended_thinking"``.

Returns:
``True`` if the capability is supported, ``False`` otherwise.
"""
return False

# Not decorated with @abstractmethod intentionally: providers that only
# support synchronous completion (e.g. thin wrappers) don't need to
# implement streaming. Calling async_stream() on such a provider raises
# NotImplementedError at call time rather than at instantiation.
async def async_stream(
self,
messages: list[dict],
system: str,
tools: list[dict],
model: str,
max_tokens: int,
interrupt_event: Optional[asyncio.Event] = None,
extended_thinking: bool = False,
) -> AsyncIterator["StreamChunk"]:
"""Stream a completion as normalized :class:`StreamChunk` objects.

Subclasses should override this with a provider-specific implementation.
The default raises :exc:`NotImplementedError`.

Args:
messages: Conversation messages in the provider's expected format.
system: System prompt string.
tools: Already-serialized tool definitions (list of dicts).
model: Model identifier to use for this call.
max_tokens: Maximum output tokens.
interrupt_event: When set, the stream should stop at the next
opportunity.
extended_thinking: When ``True``, request extended thinking tokens
from providers that support them (see :meth:`supports`).
Providers that do not support this capability should silently
ignore the flag.

Yields:
:class:`StreamChunk` objects in order of generation.
"""
raise NotImplementedError(
f"{type(self).__name__} does not implement async_stream(). "
"Override this method in your provider subclass."
)
if False: # pragma: no cover # makes this an async generator
yield # type: ignore[misc]

def get_model(self, purpose: Purpose) -> str:
"""Get the model for a given purpose.

Expand Down
77 changes: 76 additions & 1 deletion codeframe/adapters/llm/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
Supports configurable responses and call tracking.
"""

from typing import Callable, Iterator, Optional
import asyncio
from typing import AsyncIterator, Callable, Iterator, Optional

from codeframe.adapters.llm.base import (
LLMProvider,
LLMResponse,
ModelSelector,
Purpose,
StreamChunk,
Tool,
ToolCall,
)
Expand Down Expand Up @@ -40,6 +42,8 @@ def __init__(
self.responses: list[LLMResponse] = []
self.response_index = 0
self.response_handler: Optional[Callable[[list[dict]], LLMResponse]] = None
self.stream_chunks: list[list[StreamChunk]] = []
self.stream_index = 0

def add_response(self, response: LLMResponse) -> None:
"""Add a canned response to the queue.
Expand Down Expand Up @@ -175,12 +179,83 @@ def stream(
for word in response.content.split():
yield word + " "

def add_stream_chunks(self, chunks: list[StreamChunk]) -> None:
"""Add a sequence of StreamChunks for the next async_stream() call.

Args:
chunks: Ordered list of StreamChunk objects to yield.
"""
self.stream_chunks.append(chunks)

async def async_stream(
self,
messages: list[dict],
system: str,
tools: list[dict],
model: str,
max_tokens: int,
interrupt_event: Optional[asyncio.Event] = None,
extended_thinking: bool = False,
) -> AsyncIterator[StreamChunk]:
"""Yield pre-configured StreamChunk sequences for testing.

Tracks each call in :attr:`calls` (same metadata as :meth:`complete`).
When pre-configured ``stream_chunks`` are available, yields them in
order. Otherwise falls back to a minimal ``text_delta`` +
``message_stop`` pair derived from the normal response queue
(``responses`` / ``response_handler`` / ``default_response``).
"""
# Track the call so tests can assert on it
self.calls.append(
{
"messages": messages,
"system": system,
"tools": tools,
"model": model,
"max_tokens": max_tokens,
"extended_thinking": extended_thinking,
}
)

if self.stream_index < len(self.stream_chunks):
chunks = self.stream_chunks[self.stream_index]
self.stream_index += 1
else:
# Derive response text from the normal queue / handler
if self.response_handler:
resp = self.response_handler(messages)
text = resp.content
elif self.response_index < len(self.responses):
resp = self.responses[self.response_index]
self.response_index += 1
text = resp.content
else:
text = self.default_response

chunks = [
StreamChunk(type="text_delta", text=text),
StreamChunk(
type="message_stop",
stop_reason="end_turn",
input_tokens=len(str(messages)),
output_tokens=len(text),
tool_inputs_by_id={},
),
]

for chunk in chunks:
if interrupt_event and interrupt_event.is_set():
return
yield chunk
Comment on lines +190 to +249
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Keep async_stream() aligned with the rest of MockProvider.

When no explicit stream_chunks are queued, this path ignores response_handler and queued responses, and it never records the call. Tests that switch from complete()/stream() to async_stream() will silently get different behavior, and call_count / last_call stop reflecting streaming usage.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/adapters/llm/mock.py` around lines 190 - 223, The default branch of
async_stream ignores the provider's response queue and logging; modify
MockProvider.async_stream so when no preconfigured stream_chunks are present it
(1) increments self.call_count and sets self.last_call with the same call
metadata used by stream()/complete(), (2) consumes from self.responses (or uses
self.default_response) and invokes self.response_handler to generate the
StreamChunk sequence instead of hardcoding chunks, and (3) yields those chunks
while still honoring interrupt_event; reference async_stream, response_handler,
self.responses, self.default_response, self.call_count and self.last_call to
locate and update the logic.


def reset(self) -> None:
"""Reset call tracking and response queue."""
self.calls.clear()
self.responses.clear()
self.response_index = 0
self.response_handler = None
self.stream_chunks.clear()
self.stream_index = 0

@property
def call_count(self) -> int:
Expand Down
Loading
Loading