Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion agent/core/agent_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -1970,7 +1970,11 @@ async def execute_tool(tc, tool_name, tool_args, was_edited):
tc, tool_name, output, success, was_edited = result

if was_edited:
output = f"[Note: The user edited the script before execution. The output below reflects the user-modified version, not your original script.]\n\n{output}"
note = "[Note: The user edited the script before execution. The output below reflects the user-modified version, not your original script.]"
if isinstance(output, list):
output = [{"type": "text", "text": note}, *output]
else:
output = f"{note}\n\n{output}"

# Add tool result to context
tool_msg = Message(
Expand Down
71 changes: 40 additions & 31 deletions agent/core/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,52 +67,59 @@
NOT_ALLOWED_TOOL_NAMES = ["hf_jobs", "hf_doc_search", "hf_doc_fetch", "hf_whoami"]


def convert_mcp_content_to_string(content: list) -> str:
def convert_mcp_content_to_llm_content(content: list) -> str | list[dict]:
"""
Convert MCP content blocks to a string format compatible with LLM messages.
Convert MCP content blocks to a format compatible with LiteLLM messages.

Returns a plain string when content contains only text (backward-compatible
with text-only tool results). Returns a list of OpenAI-style content blocks
when any ImageContent is present, so vision-capable models receive the actual
image data instead of a placeholder.

Based on FastMCP documentation, content can be:
- TextContent: has .text field
- ImageContent: has .data and .mimeType fields
- ImageContent: has .data (base64) and .mimeType fields
- EmbeddedResource: has .resource field with .text or .blob

Args:
content: List of MCP content blocks

Returns:
String representation of the content suitable for LLM consumption
"""
if not content:
return ""

parts = []
blocks: list[dict] = []
has_image = False

for item in content:
if isinstance(item, TextContent):
# Extract text from TextContent blocks
parts.append(item.text)
blocks.append({"type": "text", "text": item.text})
elif isinstance(item, ImageContent):
# TODO: Handle images
# For images, include a description with MIME type
parts.append(f"[Image: {item.mimeType}]")
has_image = True
blocks.append(
{
"type": "image_url",
"image_url": {
"url": f"data:{item.mimeType};base64,{item.data}"
},
}
)
elif isinstance(item, EmbeddedResource):
# TODO: Handle embedded resources
# For embedded resources, try to extract text
resource = item.resource
if hasattr(resource, "text") and resource.text:
parts.append(resource.text)
blocks.append({"type": "text", "text": resource.text})
elif hasattr(resource, "blob") and resource.blob:
parts.append(
f"[Binary data: {resource.mimeType if hasattr(resource, 'mimeType') else 'unknown'}]"
mime = (
resource.mimeType
if hasattr(resource, "mimeType")
else "unknown"
)
blocks.append({"type": "text", "text": f"[Binary data: {mime}]"})
else:
parts.append(
f"[Resource: {resource.uri if hasattr(resource, 'uri') else 'unknown'}]"
)
uri = resource.uri if hasattr(resource, "uri") else "unknown"
blocks.append({"type": "text", "text": f"[Resource: {uri}]"})
else:
# Fallback: try to convert to string
parts.append(str(item))
blocks.append({"type": "text", "text": str(item)})

return "\n".join(parts)
if not has_image:
return "\n".join(b["text"] for b in blocks)
return blocks


@dataclass
Expand Down Expand Up @@ -248,12 +255,14 @@ async def call_tool(
arguments: dict[str, Any],
session: Any = None,
tool_call_id: str | None = None,
) -> tuple[str, bool]:
) -> tuple[str | list[dict], bool]:
"""
Call a tool and return (output_string, success_bool).
Call a tool and return (output, success_bool).

For MCP tools, converts the CallToolResult content blocks to a string.
For built-in tools, calls their handler directly.
For MCP tools, converts the CallToolResult content blocks via
convert_mcp_content_to_llm_content — plain str for text-only results,
list of OpenAI content blocks when images are present.
For built-in tools, calls their handler directly (always returns str).
"""
# Check if this is a built-in tool with a handler
tool = self.tools.get(tool_name)
Expand All @@ -275,7 +284,7 @@ async def call_tool(
if self._mcp_initialized:
try:
result = await self.mcp_client.call_tool(tool_name, arguments)
output = convert_mcp_content_to_string(result.content)
output = convert_mcp_content_to_llm_content(result.content)
return output, not result.is_error
except ToolError as e:
# Catch MCP tool errors and return them to the agent
Expand Down
188 changes: 188 additions & 0 deletions tests/unit/test_mcp_content_conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
"""Tests for convert_mcp_content_to_llm_content — MCP block → LiteLLM format."""

import base64

import pytest
from mcp.types import EmbeddedResource, ImageContent, TextContent

from agent.core.tools import convert_mcp_content_to_llm_content


# ── helpers ─────────────────────────────────────────────────────────────────


def _text(t: str) -> TextContent:
return TextContent(type="text", text=t)


def _image(data: str = "aGVsbG8=", mime: str = "image/png") -> ImageContent:
return ImageContent(type="image", data=data, mimeType=mime)


def _embedded_text(text: str, uri: str = "file:///test") -> EmbeddedResource:
from mcp.types import TextResourceContents

return EmbeddedResource(
type="resource",
resource=TextResourceContents(uri=uri, text=text),
)


def _embedded_blob(
blob: str = "aGVsbG8=", mime: str = "application/octet-stream", uri: str = "file:///test"
) -> EmbeddedResource:
from mcp.types import BlobResourceContents

return EmbeddedResource(
type="resource",
resource=BlobResourceContents(uri=uri, blob=blob, mimeType=mime),
)


# ── empty / trivial ──────────────────────────────────────────────────────────


def test_empty_list_returns_empty_string():
assert convert_mcp_content_to_llm_content([]) == ""


# ── text-only: must return str ───────────────────────────────────────────────


def test_single_text_block_returns_str():
result = convert_mcp_content_to_llm_content([_text("hello")])
assert result == "hello"
assert isinstance(result, str)


def test_multiple_text_blocks_joined_with_newline():
result = convert_mcp_content_to_llm_content([_text("line one"), _text("line two")])
assert result == "line one\nline two"
assert isinstance(result, str)


def test_embedded_text_resource_returns_str():
result = convert_mcp_content_to_llm_content([_embedded_text("resource text")])
assert result == "resource text"
assert isinstance(result, str)


def test_embedded_blob_resource_returns_binary_placeholder():
result = convert_mcp_content_to_llm_content([_embedded_blob(mime="image/jpeg")])
assert isinstance(result, str)
assert "[Binary data: image/jpeg]" in result


def test_embedded_resource_without_text_or_blob_returns_uri_placeholder():
from mcp.types import TextResourceContents

resource = EmbeddedResource(
type="resource",
resource=TextResourceContents(uri="file:///mystery", text=""),
)
result = convert_mcp_content_to_llm_content([resource])
assert isinstance(result, str)


def test_text_and_embedded_text_returns_str():
result = convert_mcp_content_to_llm_content(
[_text("intro"), _embedded_text("body")]
)
assert result == "intro\nbody"
assert isinstance(result, str)


# ── image present: must return list[dict] ───────────────────────────────────


def test_single_image_returns_list():
result = convert_mcp_content_to_llm_content([_image()])
assert isinstance(result, list)
assert len(result) == 1


def test_image_block_has_image_url_type():
result = convert_mcp_content_to_llm_content([_image()])
assert result[0]["type"] == "image_url"


def test_image_block_url_encodes_mime_and_data():
data = base64.b64encode(b"fake-png-bytes").decode()
result = convert_mcp_content_to_llm_content([_image(data=data, mime="image/png")])
url = result[0]["image_url"]["url"]
assert url == f"data:image/png;base64,{data}"


def test_jpeg_mime_type_is_preserved():
data = base64.b64encode(b"fake-jpeg").decode()
result = convert_mcp_content_to_llm_content([_image(data=data, mime="image/jpeg")])
url = result[0]["image_url"]["url"]
assert url.startswith("data:image/jpeg;base64,")


def test_mixed_text_and_image_returns_list():
result = convert_mcp_content_to_llm_content([_text("caption"), _image()])
assert isinstance(result, list)
assert len(result) == 2


def test_mixed_text_block_has_correct_fields():
result = convert_mcp_content_to_llm_content([_text("caption"), _image()])
text_block = result[0]
assert text_block["type"] == "text"
assert text_block["text"] == "caption"


def test_mixed_order_preserved():
data = base64.b64encode(b"px").decode()
result = convert_mcp_content_to_llm_content(
[_text("before"), _image(data=data), _text("after")]
)
assert isinstance(result, list)
assert len(result) == 3
assert result[0] == {"type": "text", "text": "before"}
assert result[1]["type"] == "image_url"
assert result[2] == {"type": "text", "text": "after"}


def test_multiple_images_all_appear_in_list():
data1 = base64.b64encode(b"img1").decode()
data2 = base64.b64encode(b"img2").decode()
result = convert_mcp_content_to_llm_content(
[_image(data=data1, mime="image/png"), _image(data=data2, mime="image/webp")]
)
assert isinstance(result, list)
assert len(result) == 2
assert f"data:image/png;base64,{data1}" in result[0]["image_url"]["url"]
assert f"data:image/webp;base64,{data2}" in result[1]["image_url"]["url"]


def test_image_with_embedded_blob_returns_list():
result = convert_mcp_content_to_llm_content([_image(), _embedded_blob()])
assert isinstance(result, list)
text_block = next(b for b in result if b["type"] == "text")
assert "[Binary data:" in text_block["text"]


# ── unknown content type fallback ───────────────────────────────────────────


def test_unknown_content_type_falls_back_to_str_representation():
class _Unknown:
def __str__(self):
return "mystery-content"

result = convert_mcp_content_to_llm_content([_Unknown()])
assert result == "mystery-content"
assert isinstance(result, str)


def test_unknown_content_alongside_image_becomes_text_block():
class _Unknown:
def __str__(self):
return "mystery"

result = convert_mcp_content_to_llm_content([_Unknown(), _image()])
assert isinstance(result, list)
text_block = result[0]
assert text_block == {"type": "text", "text": "mystery"}
Loading