Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies = [
"pyyaml (>=6.0.2,<7.0.0)",
"invoke (>=2.2.0,<3.0.0)",
"opentelemetry-api (>=1.33.1,<2.0.0)",
"opentelemetry-semantic-conventions (>=0.59b0,<0.61)",
"opentelemetry-semantic-conventions (>=0.60b1,<0.61)",
]

[project.optional-dependencies]
Expand All @@ -37,6 +37,7 @@ Repository = "https://github.com/mistralai/client-python.git"
[dependency-groups]
dev = [
"mypy==1.15.0",
"opentelemetry-sdk (>=1.33.1,<2.0.0)",
"pylint==3.2.3",
"pytest>=8.2.2,<9",
"pytest-asyncio>=0.23.7,<0.24",
Expand Down
657 changes: 469 additions & 188 deletions src/mistralai/extra/observability/otel.py

Large diffs are not rendered by default.

156 changes: 156 additions & 0 deletions src/mistralai/extra/observability/serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""Serialization helpers for converting Mistral API payloads to OTEL GenAI convention formats.

These are pure functions with no OTEL dependencies — they transform dicts to JSON strings
matching the GenAI semantic convention schemas for input/output messages and tool definitions.

Schemas:
- Input messages: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-input-messages.json
- Output messages: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-output-messages.json
- Tool definitions: https://github.com/Cirilla-zmh/semantic-conventions/blob/cc4d07e7e56b80e9aa5904a3d524c134699da37f/docs/gen-ai/gen-ai-tool-definitions.json
"""

import json
from typing import Any


def _content_to_parts(content) -> list[dict]:
"""Convert Mistral message content to OTEL parts array.

Mistral content is either a string or an array of content chunks.
"""
if content is None:
return []
if isinstance(content, str):
return [{"type": "text", "content": content}]
# Content chunks array — map known Mistral types to OTEL part types
parts = []
for chunk in content:
if isinstance(chunk, str):
parts.append({"type": "text", "content": chunk})
elif isinstance(chunk, dict):
chunk_type = chunk.get("type", "")
if chunk_type == "text":
parts.append({"type": "text", "content": chunk.get("text", "")})
elif chunk_type == "thinking":
thinking = chunk.get("thinking", "")
if isinstance(thinking, list):
text_parts = [
sub.get("text", "")
for sub in thinking
if isinstance(sub, dict) and sub.get("type") == "text"
]
content_str = "\n".join(text_parts)
else: # Fallback
content_str = str(thinking)
parts.append({"type": "reasoning", "content": content_str})
elif chunk_type == "image_url":
url = chunk.get("image_url", {})
uri = url.get("url", "") if isinstance(url, dict) else str(url)
parts.append({"type": "uri", "modality": "image", "uri": uri})
else:
# Catch-all for other content chunk types
parts.append({"type": chunk_type})
return parts


def _tool_calls_to_parts(tool_calls: list[dict] | None) -> list[dict]:
"""Convert Mistral tool_calls to OTEL ToolCallRequestPart entries."""
if not tool_calls:
return []
parts = []
for tc in tool_calls:
func = tc.get("function", {}) or {}
part: dict = {
"type": "tool_call",
"name": func.get("name", ""),
}
if (tc_id := tc.get("id")) is not None:
part["id"] = tc_id
if (arguments := func.get("arguments")) is not None:
part["arguments"] = arguments
parts.append(part)
return parts


def serialize_input_message(message: dict[str, Any]) -> str:
"""Serialize a single input message per the OTEL GenAI convention.

Schema: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-input-messages.json
ChatMessage: {role (required), parts (required), name?}

Conversation entry objects (e.g. function.result) don't carry a "role"
field — they are detected via their "type" and mapped to the closest
OTEL role.
"""
entry_type = message.get("type")

# Conversation entry: function.result → OTEL tool role
if entry_type == "function.result":
part: dict = {"type": "tool_call_response", "response": message.get("result")}
if (tool_call_id := message.get("tool_call_id")) is not None:
part["id"] = tool_call_id
return json.dumps({"role": "tool", "parts": [part]})

# TODO: may need to handle other types for conversations (e.g. agent handoff)

role = message.get("role", "unknown")
parts: list[dict] = []

if role == "tool":
# Tool messages are responses to tool calls
tool_part: dict = {
"type": "tool_call_response",
"response": message.get("content"),
}
if (tool_call_id := message.get("tool_call_id")) is not None:
tool_part["id"] = tool_call_id
parts.append(tool_part)
else:
parts.extend(_content_to_parts(message.get("content")))
parts.extend(_tool_calls_to_parts(message.get("tool_calls")))

return json.dumps({"role": role, "parts": parts})


def serialize_output_message(choice: dict[str, Any]) -> str:
"""Serialize a single output choice/message per the OTEL GenAI convention.

Schema: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-output-messages.json
OutputMessage: {role (required), parts (required), finish_reason (required), name?}
"""
message = choice.get("message", {}) or {}
parts: list[dict] = []
parts.extend(_content_to_parts(message.get("content")))
parts.extend(_tool_calls_to_parts(message.get("tool_calls")))

return json.dumps(
{
"role": message.get("role", "assistant"),
"parts": parts,
"finish_reason": choice.get("finish_reason", ""),
}
)


def serialize_tool_definition(tool: dict[str, Any]) -> str | None:
"""Flatten a Mistral tool definition to the OTEL GenAI convention schema.

Mistral format: {"type": "function", "function": {"name": ..., "description": ..., "parameters": ...}}
OTEL format: {"type": "function", "name": ..., "description": ..., "parameters": ...}

Schema, still under review: https://github.com/Cirilla-zmh/semantic-conventions/blob/cc4d07e7e56b80e9aa5904a3d524c134699da37f/docs/gen-ai/gen-ai-tool-definitions.json
"""
# Early exit conditions: only functions supported for now, and name is required
type = tool.get("type", "function")
func = tool.get("function")
if not func:
return None
name = func.get("name")
if not name:
return None
serialized: dict = {"type": type, "name": name}
if (description := func.get("description")) is not None:
serialized["description"] = description
if (parameters := func.get("parameters")) is not None:
serialized["parameters"] = parameters
return json.dumps(serialized)
98 changes: 98 additions & 0 deletions src/mistralai/extra/observability/streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Streaming response helpers for OTEL tracing.

Pure functions that parse SSE byte streams and accumulate CompletionChunk
deltas into a ChatCompletionResponse-shaped dict suitable for span enrichment.

TODO: supports chat and agent completion streaming endpoints. Evolutions will
be necessary to support other streaming endpoints (e.g. conversations).

NOTE: The SSE bytes are re-parsed here even though EventStream already
parsed them during iteration.
TracedResponse sits below EventStream and can only accumulate raw bytes; it
has no access to the decoded events. Hooking into EventStream could eliminate
this double-parse, but EventStream is Speakeasy-generated code.
"""

from typing import Any

from mistralai.client.models import CompletionChunk, UsageInfo


def parse_sse_chunks(raw_sse_bytes: bytes) -> list[CompletionChunk]:
"""Parse raw SSE bytes into a list of typed CompletionChunk models.

Only CompletionChunk is handled. If new SSE-streamed response types
are added, parsing and typing here will need updating.
"""
chunks: list[CompletionChunk] = []
text = raw_sse_bytes.decode("utf-8", errors="replace")
for line in text.split("\n"):
line = line.strip()
if not line.startswith("data: "):
continue
payload = line[6:]
if payload == "[DONE]":
continue
try:
chunks.append(CompletionChunk.model_validate_json(payload))
except Exception:
continue
return chunks


def accumulate_chunks_to_response_dict(
chunks: list[CompletionChunk],
) -> dict[str, Any]:
"""Accumulate streaming CompletionChunk deltas into a ChatCompletionResponse-shaped dict."""
response_id: str | None = None
model: str | None = None
usage: UsageInfo | None = None
choices: dict[int, dict[str, Any]] = {}

for chunk in chunks:
response_id = response_id or chunk.id
model = model or chunk.model
usage = usage or chunk.usage

for choice in chunk.choices:
accumulated = choices.setdefault(
choice.index,
{
"message": {"role": "assistant", "content": ""},
"finish_reason": "",
},
)
msg = accumulated["message"]
delta = choice.delta
if isinstance(delta.role, str):
msg["role"] = delta.role
if isinstance(delta.content, str) and delta.content:
msg["content"] += delta.content
if isinstance(choice.finish_reason, str):
accumulated["finish_reason"] = choice.finish_reason
if isinstance(delta.tool_calls, list):
tc_list = msg.setdefault("tool_calls", [])
for tc in delta.tool_calls:
tc_idx = tc.index if tc.index is not None else len(tc_list)
while len(tc_list) <= tc_idx:
tc_list.append(
{"id": None, "function": {"name": "", "arguments": ""}}
)
# ToolCall.id defaults to the string "null" (Speakeasy codegen quirk)
if tc.id is not None and tc.id != "null":
tc_list[tc_idx]["id"] = tc.id
if tc.function.name:
tc_list[tc_idx]["function"]["name"] += tc.function.name
if isinstance(tc.function.arguments, str) and tc.function.arguments:
tc_list[tc_idx]["function"]["arguments"] += (
tc.function.arguments
)

result: dict[str, Any] = {
"id": response_id,
"model": model,
"choices": [choices[idx] for idx in sorted(choices)],
}
if usage is not None:
result["usage"] = usage.model_dump(mode="json", by_alias=True)
return result
54 changes: 34 additions & 20 deletions src/mistralai/extra/run/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,28 @@
import opentelemetry.semconv._incubating.attributes.gen_ai_attributes as gen_ai_attributes
from griffe import (
Docstring,
DocstringSectionKind,
DocstringSectionText,
DocstringParameter,
DocstringSection,
DocstringSectionKind,
DocstringSectionText,
)
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from pydantic import Field, create_model
from pydantic.fields import FieldInfo

from mistralai.extra.exceptions import RunException
from mistralai.extra.mcp.base import MCPClientProtocol
from mistralai.extra.observability.otel import GenAISpanEnum, MistralAIAttributes, set_available_attributes
from mistralai.extra.run.result import RunOutputEntries
from mistralai.client.models import (
FunctionResultEntry,
FunctionTool,
Function,
FunctionCallEntry,
FunctionResultEntry,
FunctionTool,
)

from mistralai.extra.exceptions import RunException
from mistralai.extra.mcp.base import MCPClientProtocol
from mistralai.extra.observability.otel import (
set_available_attributes,
)
from mistralai.extra.run.result import RunOutputEntries

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -193,22 +195,35 @@ async def create_function_result(
else function_call.arguments
)
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(GenAISpanEnum.function_call(function_call.name)) as span:
with tracer.start_as_current_span(
f"{gen_ai_attributes.GenAiOperationNameValues.EXECUTE_TOOL.value} {function_call.name}"
) as span:
# Always record identity attributes so the span is useful even on error
function_call_attributes = {
gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_attributes.GenAiOperationNameValues.EXECUTE_TOOL.value,
gen_ai_attributes.GEN_AI_PROVIDER_NAME: gen_ai_attributes.GenAiProviderNameValues.MISTRAL_AI.value,
gen_ai_attributes.GEN_AI_TOOL_CALL_ID: function_call.id,
gen_ai_attributes.GEN_AI_TOOL_CALL_ARGUMENTS: function_call.arguments
if isinstance(function_call.arguments, str)
else json.dumps(function_call.arguments),
gen_ai_attributes.GEN_AI_TOOL_NAME: function_call.name,
gen_ai_attributes.GEN_AI_TOOL_TYPE: "function",
}
set_available_attributes(span, function_call_attributes)
try:
if isinstance(run_tool, RunFunction):
res = run_tool.callable(**arguments)
elif isinstance(run_tool, RunCoroutine):
res = await run_tool.awaitable(**arguments)
elif isinstance(run_tool, RunMCPTool):
res = await run_tool.mcp_client.execute_tool(function_call.name, arguments)
function_call_attributes = {
gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_attributes.GenAiOperationNameValues.EXECUTE_TOOL.value,
gen_ai_attributes.GEN_AI_TOOL_CALL_ID: function_call.id,
MistralAIAttributes.MISTRAL_AI_TOOL_CALL_ARGUMENTS: str(function_call.arguments),
gen_ai_attributes.GEN_AI_TOOL_NAME: function_call.name
}
set_available_attributes(span, function_call_attributes)
res = await run_tool.mcp_client.execute_tool(
function_call.name, arguments
)
result_str = res if isinstance(res, str) else json.dumps(res)
span.set_attribute(gen_ai_attributes.GEN_AI_TOOL_CALL_RESULT, result_str)
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
if continue_on_fn_error is True:
return FunctionResultEntry(
tool_call_id=function_call.tool_call_id,
Expand All @@ -219,8 +234,7 @@ async def create_function_result(
) from e

return FunctionResultEntry(
tool_call_id=function_call.tool_call_id,
result=res if isinstance(res, str) else json.dumps(res),
tool_call_id=function_call.tool_call_id, result=result_str
)


Expand Down
Loading