diff --git a/pyproject.toml b/pyproject.toml index bb175abf..f9562708 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ dependencies = [ "pyyaml (>=6.0.2,<7.0.0)", "invoke (>=2.2.0,<3.0.0)", "opentelemetry-api (>=1.33.1,<2.0.0)", - "opentelemetry-semantic-conventions (>=0.59b0,<0.61)", + "opentelemetry-semantic-conventions (>=0.60b1,<0.61)", ] [project.optional-dependencies] @@ -37,6 +37,7 @@ Repository = "https://github.com/mistralai/client-python.git" [dependency-groups] dev = [ "mypy==1.15.0", + "opentelemetry-sdk (>=1.33.1,<2.0.0)", "pylint==3.2.3", "pytest>=8.2.2,<9", "pytest-asyncio>=0.23.7,<0.24", diff --git a/src/mistralai/extra/observability/otel.py b/src/mistralai/extra/observability/otel.py index 1f7e2c2f..45c33535 100644 --- a/src/mistralai/extra/observability/otel.py +++ b/src/mistralai/extra/observability/otel.py @@ -1,3 +1,9 @@ +"""OTEL conventions for gen AI may be found at: + +https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/ +https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-agent-spans/ +""" + import copy import json import logging @@ -5,40 +11,46 @@ import traceback from datetime import datetime, timezone from enum import Enum +from typing import Any import httpx import opentelemetry.semconv._incubating.attributes.gen_ai_attributes as gen_ai_attributes import opentelemetry.semconv._incubating.attributes.http_attributes as http_attributes +import opentelemetry.semconv.attributes.error_attributes as error_attributes import opentelemetry.semconv.attributes.server_attributes as server_attributes +from opentelemetry import context as context_api from opentelemetry import propagate, trace from opentelemetry.trace import Span, Status, StatusCode, Tracer, set_span_in_context +from .serialization import ( + serialize_input_message, + serialize_output_message, + serialize_tool_definition, +) +from .streaming import accumulate_chunks_to_response_dict, parse_sse_chunks + logger = logging.getLogger(__name__) OTEL_SERVICE_NAME: str = "mistralai_sdk" MISTRAL_SDK_OTEL_TRACER_NAME: str = OTEL_SERVICE_NAME + "_tracer" -MISTRAL_SDK_DEBUG_TRACING: bool = os.getenv("MISTRAL_SDK_DEBUG_TRACING", "false").lower() == "true" +MISTRAL_SDK_DEBUG_TRACING: bool = ( + os.getenv("MISTRAL_SDK_DEBUG_TRACING", "false").lower() == "true" +) DEBUG_HINT: str = "To see detailed tracing logs, set MISTRAL_SDK_DEBUG_TRACING=true." class MistralAIAttributes: - MISTRAL_AI_TOTAL_TOKENS = "mistral_ai.request.total_tokens" - MISTRAL_AI_TOOL_CALL_ARGUMENTS = "mistral_ai.tool.call.arguments" - MISTRAL_AI_MESSAGE_ID = "mistral_ai.message.id" - MISTRAL_AI_OPERATION_NAME= "mistral_ai.operation.name" MISTRAL_AI_OCR_USAGE_PAGES_PROCESSED = "mistral_ai.ocr.usage.pages_processed" MISTRAL_AI_OCR_USAGE_DOC_SIZE_BYTES = "mistral_ai.ocr.usage.doc_size_bytes" - MISTRAL_AI_OPERATION_ID = "mistral_ai.operation.id" - MISTRAL_AI_ERROR_TYPE = "mistral_ai.error.type" - MISTRAL_AI_ERROR_MESSAGE = "mistral_ai.error.message" MISTRAL_AI_ERROR_CODE = "mistral_ai.error.code" - MISTRAL_AI_FUNCTION_CALL_ARGUMENTS = "mistral_ai.function.call.arguments" + class MistralAINameValues(Enum): OCR = "ocr" + class TracingErrors(Exception, Enum): FAILED_TO_CREATE_SPAN_FOR_REQUEST = "Failed to create span for request." FAILED_TO_ENRICH_SPAN_WITH_RESPONSE = "Failed to enrich span with response." @@ -48,28 +60,74 @@ class TracingErrors(Exception, Enum): def __str__(self): return str(self.value) + class GenAISpanEnum(str, Enum): CONVERSATION = "conversation" - CONV_REQUEST = "POST /v1/conversations" - EXECUTE_TOOL = "execute_tool" VALIDATE_RUN = "validate_run" - @staticmethod - def function_call(func_name: str): - return f"function_call[{func_name}]" - def parse_time_to_nanos(ts: str) -> int: dt = datetime.fromisoformat(ts.replace("Z", "+00:00")).astimezone(timezone.utc) return int(dt.timestamp() * 1e9) -def set_available_attributes(span: Span, attributes: dict) -> None: + +def _infer_gen_ai_operation_name( + operation_id: str, +) -> gen_ai_attributes.GenAiOperationNameValues | None: + """Infer the GenAI operation name from the operation_id using rule-based matching.""" + if "chat_completion" in operation_id or operation_id == "stream_chat": + return gen_ai_attributes.GenAiOperationNameValues.CHAT + if ( + "agents_create" in operation_id or "agents_update" in operation_id + ) and "alias" not in operation_id: + return gen_ai_attributes.GenAiOperationNameValues.CREATE_AGENT + if "agents_completion" in operation_id or operation_id == "stream_agents": + return gen_ai_attributes.GenAiOperationNameValues.INVOKE_AGENT + if "conversations" in operation_id and any( + action in operation_id for action in ("start", "append", "restart") + ): + return gen_ai_attributes.GenAiOperationNameValues.INVOKE_AGENT + if "fim" in operation_id: + return gen_ai_attributes.GenAiOperationNameValues.TEXT_COMPLETION + if "embeddings" in operation_id: + return gen_ai_attributes.GenAiOperationNameValues.EMBEDDINGS + if "ocr_post" in operation_id: + return gen_ai_attributes.GenAiOperationNameValues.GENERATE_CONTENT + # TODO: Handle transcriptions (audio_api_v1_transcriptions_post[_stream]) + return None + + +def _build_genai_span_name( + gen_ai_op: gen_ai_attributes.GenAiOperationNameValues, body: dict[str, Any] +) -> str: + """Build span name per GenAI semantic conventions. + + - Chat/text_completion/embeddings: "{operation_name} {model}" + - create_agent/invoke_agent: "{operation_name} {agent_name}" + - execute_tool: "execute_tool {gen_ai.tool.name}" + """ + op_name = gen_ai_op.value + if gen_ai_op in { + gen_ai_attributes.GenAiOperationNameValues.CREATE_AGENT, + gen_ai_attributes.GenAiOperationNameValues.INVOKE_AGENT, + }: + agent_name = body.get("name", "") + return f"{op_name} {agent_name}" if agent_name else op_name + if gen_ai_op is gen_ai_attributes.GenAiOperationNameValues.EXECUTE_TOOL: + tool_name = body.get("name", "") + return f"{op_name} {tool_name}" if tool_name else op_name + model = body.get("model", "") + return f"{op_name} {model}" if model else op_name + + +def set_available_attributes(span: Span, attributes: dict[str, Any]) -> None: for attribute, value in attributes.items(): if value: span.set_attribute(attribute, value) -def enrich_span_from_request(span: Span, request: httpx.Request) -> Span: +def _set_http_attributes(span: Span, operation_id: str, request: httpx.Request) -> None: + """Set HTTP and server attributes on the span.""" if not request.url.port: # From httpx doc: # Note that the URL class performs port normalization as per the WHATWG spec. @@ -84,120 +142,275 @@ def enrich_span_from_request(span: Span, request: httpx.Request) -> Span: else: port = request.url.port - span.set_attributes({ - http_attributes.HTTP_REQUEST_METHOD: request.method, - http_attributes.HTTP_URL: str(request.url), - server_attributes.SERVER_ADDRESS: request.headers.get("host", ""), - server_attributes.SERVER_PORT: port - }) - if request._content: - request_body = json.loads(request._content) - - attributes = { - gen_ai_attributes.GEN_AI_REQUEST_CHOICE_COUNT: request_body.get("n", None), - gen_ai_attributes.GEN_AI_REQUEST_ENCODING_FORMATS: request_body.get("encoding_formats", None), - gen_ai_attributes.GEN_AI_REQUEST_FREQUENCY_PENALTY: request_body.get("frequency_penalty", None), - gen_ai_attributes.GEN_AI_REQUEST_MAX_TOKENS: request_body.get("max_tokens", None), - gen_ai_attributes.GEN_AI_REQUEST_MODEL: request_body.get("model", None), - gen_ai_attributes.GEN_AI_REQUEST_PRESENCE_PENALTY: request_body.get("presence_penalty", None), - gen_ai_attributes.GEN_AI_REQUEST_SEED: request_body.get("random_seed", None), - gen_ai_attributes.GEN_AI_REQUEST_STOP_SEQUENCES: request_body.get("stop", None), - gen_ai_attributes.GEN_AI_REQUEST_TEMPERATURE: request_body.get("temperature", None), - gen_ai_attributes.GEN_AI_REQUEST_TOP_P: request_body.get("top_p", None), - gen_ai_attributes.GEN_AI_REQUEST_TOP_K: request_body.get("top_k", None), - # Input messages are likely to be large, containing user/PII data and other sensitive information. - # Also structured attributes are not yet supported on spans in Python. - # For those reasons, we will not record the input messages for now. - gen_ai_attributes.GEN_AI_INPUT_MESSAGES: None, + span.set_attributes( + { + http_attributes.HTTP_REQUEST_METHOD: request.method, + http_attributes.HTTP_URL: str(request.url), + server_attributes.SERVER_ADDRESS: request.headers.get("host", ""), + server_attributes.SERVER_PORT: port, } - # Set attributes only if they are not None. - # From OpenTelemetry documentation: None is not a valid attribute value per spec / is not a permitted value type for an attribute. - set_available_attributes(span, attributes) - return span + ) + + +def _enrich_request_genai_attrs( + span: Span, + gen_ai_op: gen_ai_attributes.GenAiOperationNameValues, + request_body: dict[str, Any], +) -> None: + """Set GenAI request attributes: model params, input messages, tool definitions.""" + # Update span name per GenAI semantic conventions, now that we have the parsed request body. + span.update_name(_build_genai_span_name(gen_ai_op, request_body)) + + attributes = { + gen_ai_attributes.GEN_AI_REQUEST_CHOICE_COUNT: request_body.get("n"), + gen_ai_attributes.GEN_AI_REQUEST_ENCODING_FORMATS: request_body.get( + "encoding_formats" + ), + gen_ai_attributes.GEN_AI_REQUEST_FREQUENCY_PENALTY: request_body.get( + "frequency_penalty" + ), + gen_ai_attributes.GEN_AI_REQUEST_MAX_TOKENS: request_body.get("max_tokens"), + gen_ai_attributes.GEN_AI_REQUEST_MODEL: request_body.get("model"), + gen_ai_attributes.GEN_AI_REQUEST_PRESENCE_PENALTY: request_body.get( + "presence_penalty" + ), + gen_ai_attributes.GEN_AI_REQUEST_SEED: request_body.get("random_seed"), + gen_ai_attributes.GEN_AI_REQUEST_STOP_SEQUENCES: request_body.get("stop"), + gen_ai_attributes.GEN_AI_REQUEST_TEMPERATURE: request_body.get("temperature"), + gen_ai_attributes.GEN_AI_REQUEST_TOP_P: request_body.get("top_p"), + gen_ai_attributes.GEN_AI_REQUEST_TOP_K: request_body.get("top_k"), + } + # Chat/agent completion API uses messages in request body; conversation API uses inputs + input_messages = request_body.get("messages") or request_body.get("inputs") + if isinstance(input_messages, str): + attributes[gen_ai_attributes.GEN_AI_INPUT_MESSAGES] = [ + serialize_input_message({"role": "user", "content": input_messages}) + ] + elif isinstance(input_messages, list): + attributes[gen_ai_attributes.GEN_AI_INPUT_MESSAGES] = list( + map(serialize_input_message, input_messages) + ) + # Tool definitions + if tools := request_body.get("tools"): + attributes[gen_ai_attributes.GEN_AI_TOOL_DEFINITIONS] = list( + filter(None, map(serialize_tool_definition, tools)) + ) + # TODO: For agent start conversation, add agent id and version attributes here ? -def enrich_span_from_response(tracer: trace.Tracer, span: Span, operation_id: str, response: httpx.Response) -> None: - span.set_status(Status(StatusCode.OK)) - response_data = json.loads(response.content) + set_available_attributes(span, attributes) - # Base attributes - attributes: dict[str, str | int] = { - http_attributes.HTTP_RESPONSE_STATUS_CODE: response.status_code, - MistralAIAttributes.MISTRAL_AI_OPERATION_ID: operation_id, - gen_ai_attributes.GEN_AI_PROVIDER_NAME: gen_ai_attributes.GenAiProviderNameValues.MISTRAL_AI.value - } - # Add usage attributes if available +def enrich_span_from_request( + span: Span, operation_id: str, request: httpx.Request +) -> Span: + _set_http_attributes(span, operation_id, request) + + gen_ai_op = _infer_gen_ai_operation_name(operation_id) + if gen_ai_op is None: + return span + + span.set_attributes( + { + gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_op.value, + gen_ai_attributes.GEN_AI_PROVIDER_NAME: gen_ai_attributes.GenAiProviderNameValues.MISTRAL_AI.value, + } + ) + + if request.content: + request_body = json.loads(request.content) + _enrich_request_genai_attrs(span, gen_ai_op, request_body) + + return span + + +def _enrich_response_genai_attrs( + span: Span, + gen_ai_op: gen_ai_attributes.GenAiOperationNameValues, + response_data: dict[str, Any], +) -> None: + """Set common GenAI response attributes: response ID, model, choices, usage.""" + attributes: dict[str, Any] = {} + + if gen_ai_op is not gen_ai_attributes.GenAiOperationNameValues.CREATE_AGENT: + # id has another meaning for create agent operation (id of the agent) + attributes[gen_ai_attributes.GEN_AI_RESPONSE_ID] = response_data.get("id") + attributes[gen_ai_attributes.GEN_AI_RESPONSE_MODEL] = response_data.get("model") + + # Finish reasons and output messages from choices + choices = response_data.get("choices", []) + finish_reasons = [c.get("finish_reason") for c in choices if c.get("finish_reason")] + if finish_reasons: + attributes[gen_ai_attributes.GEN_AI_RESPONSE_FINISH_REASONS] = finish_reasons + if choices: + attributes[gen_ai_attributes.GEN_AI_OUTPUT_MESSAGES] = list( + map(serialize_output_message, choices) + ) + + # Usage usage = response_data.get("usage", {}) if usage: - attributes.update({ - gen_ai_attributes.GEN_AI_USAGE_PROMPT_TOKENS: usage.get("prompt_tokens", 0), - gen_ai_attributes.GEN_AI_USAGE_OUTPUT_TOKENS: usage.get("completion_tokens", 0), - MistralAIAttributes.MISTRAL_AI_TOTAL_TOKENS: usage.get("total_tokens", 0) - }) - - span.set_attributes(attributes) - if operation_id == "agents_api_v1_agents_create": - # Semantics from https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-agent-spans/#create-agent-span - agent_attributes = { - gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_attributes.GenAiOperationNameValues.CREATE_AGENT.value, - gen_ai_attributes.GEN_AI_AGENT_DESCRIPTION: response_data.get("description", ""), - gen_ai_attributes.GEN_AI_AGENT_ID: response_data.get("id", ""), - gen_ai_attributes.GEN_AI_AGENT_NAME: response_data.get("name", ""), - gen_ai_attributes.GEN_AI_REQUEST_MODEL: response_data.get("model", ""), - gen_ai_attributes.GEN_AI_SYSTEM_INSTRUCTIONS: response_data.get("instructions", "") - } - span.set_attributes(agent_attributes) - if operation_id in ["agents_api_v1_conversations_start", "agents_api_v1_conversations_append"]: - outputs = response_data.get("outputs", []) - conversation_attributes = { - gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_attributes.GenAiOperationNameValues.INVOKE_AGENT.value, - gen_ai_attributes.GEN_AI_CONVERSATION_ID: response_data.get("conversation_id", "") - } - span.set_attributes(conversation_attributes) - parent_context = set_span_in_context(span) - - for output in outputs: - # TODO: Only enrich the spans if it's a single turn conversation. - # Multi turn conversations are handled in the extra.run.tools.create_function_result function - if output["type"] == "function.call": - pass - if output["type"] == "tool.execution": - start_ns = parse_time_to_nanos(output["created_at"]) - end_ns = parse_time_to_nanos(output["completed_at"]) - child_span = tracer.start_span("Tool Execution", start_time=start_ns, context=parent_context) - child_span.set_attributes({"agent.trace.public": ""}) - tool_attributes = { - gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_attributes.GenAiOperationNameValues.EXECUTE_TOOL.value, - gen_ai_attributes.GEN_AI_TOOL_CALL_ID: output.get("id", ""), - MistralAIAttributes.MISTRAL_AI_TOOL_CALL_ARGUMENTS: output.get("arguments", ""), - gen_ai_attributes.GEN_AI_TOOL_NAME: output.get("name", "") - } - child_span.set_attributes(tool_attributes) - child_span.end(end_time=end_ns) - if output["type"] == "message.output": - start_ns = parse_time_to_nanos(output["created_at"]) - end_ns = parse_time_to_nanos(output["completed_at"]) - child_span = tracer.start_span("Message Output", start_time=start_ns, context=parent_context) - child_span.set_attributes({"agent.trace.public": ""}) - message_attributes = { - gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_attributes.GenAiOperationNameValues.CHAT.value, - gen_ai_attributes.GEN_AI_PROVIDER_NAME: gen_ai_attributes.GenAiProviderNameValues.MISTRAL_AI.value, - MistralAIAttributes.MISTRAL_AI_MESSAGE_ID: output.get("id", ""), - gen_ai_attributes.GEN_AI_AGENT_ID: output.get("agent_id", ""), - gen_ai_attributes.GEN_AI_REQUEST_MODEL: output.get("model", "") - } - child_span.set_attributes(message_attributes) - child_span.end(end_time=end_ns) + attributes.update( + { + gen_ai_attributes.GEN_AI_USAGE_INPUT_TOKENS: usage.get( + "prompt_tokens", 0 + ), + gen_ai_attributes.GEN_AI_USAGE_OUTPUT_TOKENS: usage.get( + "completion_tokens", 0 + ), + } + ) + + set_available_attributes(span, attributes) + + +def _enrich_create_agent(span: Span, response_data: dict[str, Any]) -> None: + """Set agent-specific attributes from create_agent response. + + Semantics: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-agent-spans/#create-agent-span + """ + agent_attributes = { + gen_ai_attributes.GEN_AI_AGENT_DESCRIPTION: response_data.get("description"), + gen_ai_attributes.GEN_AI_AGENT_ID: response_data.get("id"), + gen_ai_attributes.GEN_AI_AGENT_NAME: response_data.get("name"), + # As of 2026-03-02: in convention, but not yet in opentelemetry-semantic-conventions + "gen_ai.agent.version": str(response_data.get("version")), + gen_ai_attributes.GEN_AI_REQUEST_MODEL: response_data.get("model"), + gen_ai_attributes.GEN_AI_SYSTEM_INSTRUCTIONS: response_data.get("instructions"), + } + set_available_attributes(span, agent_attributes) + + +def _create_tool_execution_child_span( + tracer: trace.Tracer, parent_context: context_api.Context, output: dict[str, Any] +) -> None: + """Create a child span for a tool.execution conversation output.""" + start_ns = parse_time_to_nanos(output["created_at"]) + end_ns = parse_time_to_nanos(output["completed_at"]) + op_name = gen_ai_attributes.GenAiOperationNameValues.EXECUTE_TOOL + span_name = _build_genai_span_name(op_name, output) + child_span = tracer.start_span( + span_name, start_time=start_ns, context=parent_context + ) + child_span.set_attributes({"agent.trace.public": ""}) + tool_arguments = output.get("arguments") + # The tool call result is in the "info" field, if provided + tool_result = output.get("info") + tool_attributes = { + gen_ai_attributes.GEN_AI_OPERATION_NAME: op_name.value, + gen_ai_attributes.GEN_AI_PROVIDER_NAME: gen_ai_attributes.GenAiProviderNameValues.MISTRAL_AI.value, + gen_ai_attributes.GEN_AI_TOOL_CALL_ID: output.get("id"), + gen_ai_attributes.GEN_AI_TOOL_CALL_ARGUMENTS: tool_arguments + if isinstance(tool_arguments, str) + else (json.dumps(tool_arguments) if tool_arguments else None), + gen_ai_attributes.GEN_AI_TOOL_CALL_RESULT: tool_result + and json.dumps(tool_result), + gen_ai_attributes.GEN_AI_TOOL_NAME: output.get("name"), + gen_ai_attributes.GEN_AI_TOOL_TYPE: "extension", + } + set_available_attributes(child_span, tool_attributes) + child_span.end(end_time=end_ns) + + +def _create_message_output_child_span( + tracer: trace.Tracer, parent_context: context_api.Context, output: dict[str, Any] +) -> None: + """Create a child span for a message.output conversation output.""" + start_ns = parse_time_to_nanos(output["created_at"]) + end_ns = parse_time_to_nanos(output["completed_at"]) + op_name = gen_ai_attributes.GenAiOperationNameValues.CHAT + span_name = _build_genai_span_name(op_name, output) + child_span = tracer.start_span( + span_name, start_time=start_ns, context=parent_context + ) + child_span.set_attributes({"agent.trace.public": ""}) + # Wrap the flat conversation output as a choice dict so we + # can reuse serialize_output_message (which also handles + # tool_calls, not just content). + choice_wrapper: dict = { + "message": output, + "finish_reason": output.get("finish_reason", ""), + } + message_attributes = { + gen_ai_attributes.GEN_AI_OPERATION_NAME: op_name.value, + gen_ai_attributes.GEN_AI_PROVIDER_NAME: gen_ai_attributes.GenAiProviderNameValues.MISTRAL_AI.value, + gen_ai_attributes.GEN_AI_RESPONSE_ID: output.get("id"), + gen_ai_attributes.GEN_AI_AGENT_ID: output.get("agent_id"), + gen_ai_attributes.GEN_AI_RESPONSE_MODEL: output.get("model"), + gen_ai_attributes.GEN_AI_OUTPUT_MESSAGES: [ + serialize_output_message(choice_wrapper) + ], + } + set_available_attributes(child_span, message_attributes) + child_span.end(end_time=end_ns) + + +def _enrich_invoke_agent( + tracer: trace.Tracer, span: Span, response_data: dict[str, Any] +) -> None: + """Set invoke_agent attributes and create child spans for conversation outputs.""" + conversation_attributes = { + gen_ai_attributes.GEN_AI_CONVERSATION_ID: response_data.get("conversation_id"), + # We don't have more agent attributes available in the response data + # (agent id, name, version, description). For start conversation operation, + # we could get it from the request; see associated TODO + } + set_available_attributes(span, conversation_attributes) + + outputs = response_data.get("outputs", []) + parent_context = set_span_in_context(span) + for output in outputs: + output_type = output.get("type") + if not output_type: + continue # Safety net + if output_type == "function.call": + # handled in the extra.run.tools.create_function_result function + continue + elif output_type == "tool.execution": + _create_tool_execution_child_span(tracer, parent_context, output) + elif output_type == "message.output": + _create_message_output_child_span(tracer, parent_context, output) + # TODO: do type agent.handoff + + +def _enrich_ocr(span: Span, response_data: dict[str, Any]) -> None: + """Set OCR-specific usage attributes.""" + usage_info = response_data.get("usage_info", {}) + ocr_attributes = { + MistralAIAttributes.MISTRAL_AI_OCR_USAGE_PAGES_PROCESSED: usage_info.get( + "pages_processed" + ), + MistralAIAttributes.MISTRAL_AI_OCR_USAGE_DOC_SIZE_BYTES: usage_info.get( + "doc_size_bytes" + ), + } + set_available_attributes(span, ocr_attributes) + + +def _enrich_span_from_response( + tracer: Tracer, + span: Span, + operation_id: str, + response_data: dict[str, Any], +) -> None: + """Enrich span with GenAI response attributes and operation-specific data. + + Used by both the non-streaming and streaming paths so that the same + attributes are set regardless of response type. + """ + gen_ai_op = _infer_gen_ai_operation_name(operation_id) + if gen_ai_op is None: + return + + _enrich_response_genai_attrs(span, gen_ai_op, response_data) + + if gen_ai_op is gen_ai_attributes.GenAiOperationNameValues.CREATE_AGENT: + _enrich_create_agent(span, response_data) + elif gen_ai_op is gen_ai_attributes.GenAiOperationNameValues.INVOKE_AGENT: + _enrich_invoke_agent(tracer, span, response_data) + if operation_id == "ocr_v1_ocr_post": - usage_info = response_data.get("usage_info", "") - ocr_attributes = { - MistralAIAttributes.MISTRAL_AI_OPERATION_NAME: MistralAINameValues.OCR.value, - MistralAIAttributes.MISTRAL_AI_OCR_USAGE_PAGES_PROCESSED: usage_info.get("pages_processed", "") if usage_info else "", - MistralAIAttributes.MISTRAL_AI_OCR_USAGE_DOC_SIZE_BYTES: usage_info.get("doc_size_bytes", "") if usage_info else "", - gen_ai_attributes.GEN_AI_REQUEST_MODEL: response_data.get("model", "") - } - span.set_attributes(ocr_attributes) + _enrich_ocr(span, response_data) def get_or_create_otel_tracer() -> tuple[bool, Tracer]: @@ -226,6 +439,7 @@ def get_or_create_otel_tracer() -> tuple[bool, Tracer]: return tracing_enabled, tracer + def get_traced_request_and_span( tracing_enabled: bool, tracer: Tracer, @@ -233,26 +447,26 @@ def get_traced_request_and_span( operation_id: str, request: httpx.Request, ) -> tuple[httpx.Request, Span | None]: - if not tracing_enabled: - return request, span + if not tracing_enabled: + return request, span - try: - span = tracer.start_span(name=operation_id) - span.set_attributes({"agent.trace.public": ""}) - # Inject the span context into the request headers to be used by the backend service to continue the trace - propagate.inject(request.headers, context=set_span_in_context(span)) - span = enrich_span_from_request(span, request) - except Exception: - logger.warning( - "%s %s", - TracingErrors.FAILED_TO_CREATE_SPAN_FOR_REQUEST, - traceback.format_exc() if MISTRAL_SDK_DEBUG_TRACING else DEBUG_HINT, - ) - if span: - end_span(span=span) - span = None + try: + span = tracer.start_span(name=operation_id) + span.set_attributes({"agent.trace.public": ""}) + # Inject the span context into the request headers to be used by the backend service to continue the trace + propagate.inject(request.headers, context=set_span_in_context(span)) + span = enrich_span_from_request(span, operation_id, request) + except Exception: + logger.warning( + "%s %s", + TracingErrors.FAILED_TO_CREATE_SPAN_FOR_REQUEST, + traceback.format_exc() if MISTRAL_SDK_DEBUG_TRACING else DEBUG_HINT, + ) + if span: + end_span(span=span) + span = None - return request, span + return request, span def get_traced_response( @@ -265,12 +479,18 @@ def get_traced_response( if not tracing_enabled or not span: return response try: + span.set_status(Status(StatusCode.OK)) + span.set_attribute( + http_attributes.HTTP_RESPONSE_STATUS_CODE, response.status_code + ) is_stream_response = not response.is_closed and not response.is_stream_consumed if is_stream_response: - return TracedResponse.from_response(resp=response, span=span) - enrich_span_from_response( - tracer, span, operation_id, response - ) + return TracedResponse.from_response( + resp=response, span=span, tracer=tracer, operation_id=operation_id + ) + if response.content: + response_data = json.loads(response.content) + _enrich_span_from_response(tracer, span, operation_id, response_data) except Exception: logger.warning( "%s %s", @@ -281,6 +501,7 @@ def get_traced_response( end_span(span=span) return response + def get_response_and_error( tracing_enabled: bool, tracer: Tracer, @@ -289,38 +510,48 @@ def get_response_and_error( response: httpx.Response, error: Exception | None, ) -> tuple[httpx.Response, Exception | None]: - if not tracing_enabled or not span: - return response, error - try: - if error: - span.record_exception(error) - span.set_status(Status(StatusCode.ERROR, str(error))) - if hasattr(response, "_content") and response._content: - response_body = json.loads(response._content) - if response_body.get("object", "") == "error": - if error_msg := response_body.get("message", ""): - attributes = { - http_attributes.HTTP_RESPONSE_STATUS_CODE: response.status_code, - MistralAIAttributes.MISTRAL_AI_ERROR_TYPE: response_body.get("type", ""), - MistralAIAttributes.MISTRAL_AI_ERROR_MESSAGE: error_msg, - MistralAIAttributes.MISTRAL_AI_ERROR_CODE: response_body.get("code", ""), - } - for attribute, value in attributes.items(): - if value: - span.set_attribute(attribute, value) + if not tracing_enabled or not span: + return response, error + try: + if error: + span.record_exception(error) + span.set_status(Status(StatusCode.ERROR, str(error))) + if response.content: + response_body = json.loads(response.content) + if response_body.get("object", "") == "error": + if error_msg := response_body.get("message", ""): + error_type = response_body.get("type", "") + span.set_status(Status(StatusCode.ERROR, error_msg)) + span.add_event( + "exception", + { + "exception.type": error_type or "api_error", + "exception.message": error_msg, + }, + ) + attributes = { + http_attributes.HTTP_RESPONSE_STATUS_CODE: response.status_code, + error_attributes.ERROR_TYPE: error_type, + MistralAIAttributes.MISTRAL_AI_ERROR_CODE: response_body.get( + "code", "" + ), + } + for attribute, value in attributes.items(): + if value: + span.set_attribute(attribute, value) + span.end() + span = None + except Exception: + logger.warning( + "%s %s", + TracingErrors.FAILED_TO_HANDLE_ERROR_IN_SPAN, + traceback.format_exc() if MISTRAL_SDK_DEBUG_TRACING else DEBUG_HINT, + ) + + if span: span.end() span = None - except Exception: - logger.warning( - "%s %s", - TracingErrors.FAILED_TO_HANDLE_ERROR_IN_SPAN, - traceback.format_exc() if MISTRAL_SDK_DEBUG_TRACING else DEBUG_HINT, - ) - - if span: - span.end() - span = None - return response, error + return response, error def end_span(span: Span) -> None: @@ -333,34 +564,84 @@ def end_span(span: Span) -> None: traceback.format_exc() if MISTRAL_SDK_DEBUG_TRACING else DEBUG_HINT, ) + class TracedResponse(httpx.Response): + """Subclass of httpx.Response that accumulates streamed SSE bytes and + enriches the OTEL span with response attributes when the stream is closed. """ - TracedResponse is a subclass of httpx.Response that ends the span when the response is closed. - This hack allows ending the span only once the stream is fully consumed. - """ - def __init__(self, *args, span: Span | None, **kwargs) -> None: + span: Span | None + tracer: Tracer + operation_id: str + _accumulated_sse: bytearray + + def __init__( + self, + *args, + span: Span | None, + tracer: Tracer, + operation_id: str = "", + **kwargs, + ) -> None: super().__init__(*args, **kwargs) self.span = span + self.tracer = tracer + self.operation_id = operation_id + self._accumulated_sse = bytearray() + + def iter_bytes(self, *args, **kwargs): + for chunk in super().iter_bytes(*args, **kwargs): + self._accumulated_sse.extend(chunk) + yield chunk + + async def aiter_bytes(self, *args, **kwargs): + async for chunk in super().aiter_bytes(*args, **kwargs): + self._accumulated_sse.extend(chunk) + yield chunk def close(self) -> None: - if self.span: - end_span(span=self.span) + self._finalize_span() super().close() async def aclose(self) -> None: - if self.span: - end_span(span=self.span) + self._finalize_span() await super().aclose() + def _finalize_span(self) -> None: + """Enrich and end the span after the stream has been fully consumed.""" + if not self.span: + return + try: + chunks = parse_sse_chunks(bytes(self._accumulated_sse)) + if chunks: + response_data = accumulate_chunks_to_response_dict(chunks) + _enrich_span_from_response( + self.tracer, self.span, self.operation_id, response_data + ) + except Exception: + logger.warning( + "%s %s", + TracingErrors.FAILED_TO_ENRICH_SPAN_WITH_RESPONSE, + traceback.format_exc() if MISTRAL_SDK_DEBUG_TRACING else DEBUG_HINT, + ) + end_span(span=self.span) + self.span = None + @classmethod - def from_response(cls, resp: httpx.Response, span: Span | None) -> "TracedResponse": + def from_response( + cls, + resp: httpx.Response, + span: Span | None, + tracer: Tracer, + operation_id: str = "", + ) -> "TracedResponse": + # Bypass __init__ to steal the live httpx stream/connection via __dict__ copy. + # Keep tracing field assignments in sync with __init__. traced_resp = cls.__new__(cls) traced_resp.__dict__ = copy.copy(resp.__dict__) traced_resp.span = span - - # Warning: this syntax bypasses the __init__ method. - # If you add init logic in the TracedResponse.__init__ method, you will need to add the following line for it to execute: - # traced_resp.__init__(your_arguments) + traced_resp.tracer = tracer + traced_resp.operation_id = operation_id + traced_resp._accumulated_sse = bytearray() return traced_resp diff --git a/src/mistralai/extra/observability/serialization.py b/src/mistralai/extra/observability/serialization.py new file mode 100644 index 00000000..de3bfce2 --- /dev/null +++ b/src/mistralai/extra/observability/serialization.py @@ -0,0 +1,156 @@ +"""Serialization helpers for converting Mistral API payloads to OTEL GenAI convention formats. + +These are pure functions with no OTEL dependencies — they transform dicts to JSON strings +matching the GenAI semantic convention schemas for input/output messages and tool definitions. + +Schemas: +- Input messages: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-input-messages.json +- Output messages: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-output-messages.json +- Tool definitions: https://github.com/Cirilla-zmh/semantic-conventions/blob/cc4d07e7e56b80e9aa5904a3d524c134699da37f/docs/gen-ai/gen-ai-tool-definitions.json +""" + +import json +from typing import Any + + +def _content_to_parts(content) -> list[dict]: + """Convert Mistral message content to OTEL parts array. + + Mistral content is either a string or an array of content chunks. + """ + if content is None: + return [] + if isinstance(content, str): + return [{"type": "text", "content": content}] + # Content chunks array — map known Mistral types to OTEL part types + parts = [] + for chunk in content: + if isinstance(chunk, str): + parts.append({"type": "text", "content": chunk}) + elif isinstance(chunk, dict): + chunk_type = chunk.get("type", "") + if chunk_type == "text": + parts.append({"type": "text", "content": chunk.get("text", "")}) + elif chunk_type == "thinking": + thinking = chunk.get("thinking", "") + if isinstance(thinking, list): + text_parts = [ + sub.get("text", "") + for sub in thinking + if isinstance(sub, dict) and sub.get("type") == "text" + ] + content_str = "\n".join(text_parts) + else: # Fallback + content_str = str(thinking) + parts.append({"type": "reasoning", "content": content_str}) + elif chunk_type == "image_url": + url = chunk.get("image_url", {}) + uri = url.get("url", "") if isinstance(url, dict) else str(url) + parts.append({"type": "uri", "modality": "image", "uri": uri}) + else: + # Catch-all for other content chunk types + parts.append({"type": chunk_type}) + return parts + + +def _tool_calls_to_parts(tool_calls: list[dict] | None) -> list[dict]: + """Convert Mistral tool_calls to OTEL ToolCallRequestPart entries.""" + if not tool_calls: + return [] + parts = [] + for tc in tool_calls: + func = tc.get("function", {}) or {} + part: dict = { + "type": "tool_call", + "name": func.get("name", ""), + } + if (tc_id := tc.get("id")) is not None: + part["id"] = tc_id + if (arguments := func.get("arguments")) is not None: + part["arguments"] = arguments + parts.append(part) + return parts + + +def serialize_input_message(message: dict[str, Any]) -> str: + """Serialize a single input message per the OTEL GenAI convention. + + Schema: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-input-messages.json + ChatMessage: {role (required), parts (required), name?} + + Conversation entry objects (e.g. function.result) don't carry a "role" + field — they are detected via their "type" and mapped to the closest + OTEL role. + """ + entry_type = message.get("type") + + # Conversation entry: function.result → OTEL tool role + if entry_type == "function.result": + part: dict = {"type": "tool_call_response", "response": message.get("result")} + if (tool_call_id := message.get("tool_call_id")) is not None: + part["id"] = tool_call_id + return json.dumps({"role": "tool", "parts": [part]}) + + # TODO: may need to handle other types for conversations (e.g. agent handoff) + + role = message.get("role", "unknown") + parts: list[dict] = [] + + if role == "tool": + # Tool messages are responses to tool calls + tool_part: dict = { + "type": "tool_call_response", + "response": message.get("content"), + } + if (tool_call_id := message.get("tool_call_id")) is not None: + tool_part["id"] = tool_call_id + parts.append(tool_part) + else: + parts.extend(_content_to_parts(message.get("content"))) + parts.extend(_tool_calls_to_parts(message.get("tool_calls"))) + + return json.dumps({"role": role, "parts": parts}) + + +def serialize_output_message(choice: dict[str, Any]) -> str: + """Serialize a single output choice/message per the OTEL GenAI convention. + + Schema: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-output-messages.json + OutputMessage: {role (required), parts (required), finish_reason (required), name?} + """ + message = choice.get("message", {}) or {} + parts: list[dict] = [] + parts.extend(_content_to_parts(message.get("content"))) + parts.extend(_tool_calls_to_parts(message.get("tool_calls"))) + + return json.dumps( + { + "role": message.get("role", "assistant"), + "parts": parts, + "finish_reason": choice.get("finish_reason", ""), + } + ) + + +def serialize_tool_definition(tool: dict[str, Any]) -> str | None: + """Flatten a Mistral tool definition to the OTEL GenAI convention schema. + + Mistral format: {"type": "function", "function": {"name": ..., "description": ..., "parameters": ...}} + OTEL format: {"type": "function", "name": ..., "description": ..., "parameters": ...} + + Schema, still under review: https://github.com/Cirilla-zmh/semantic-conventions/blob/cc4d07e7e56b80e9aa5904a3d524c134699da37f/docs/gen-ai/gen-ai-tool-definitions.json + """ + # Early exit conditions: only functions supported for now, and name is required + type = tool.get("type", "function") + func = tool.get("function") + if not func: + return None + name = func.get("name") + if not name: + return None + serialized: dict = {"type": type, "name": name} + if (description := func.get("description")) is not None: + serialized["description"] = description + if (parameters := func.get("parameters")) is not None: + serialized["parameters"] = parameters + return json.dumps(serialized) diff --git a/src/mistralai/extra/observability/streaming.py b/src/mistralai/extra/observability/streaming.py new file mode 100644 index 00000000..3eb9423a --- /dev/null +++ b/src/mistralai/extra/observability/streaming.py @@ -0,0 +1,98 @@ +"""Streaming response helpers for OTEL tracing. + +Pure functions that parse SSE byte streams and accumulate CompletionChunk +deltas into a ChatCompletionResponse-shaped dict suitable for span enrichment. + +TODO: supports chat and agent completion streaming endpoints. Evolutions will +be necessary to support other streaming endpoints (e.g. conversations). + +NOTE: The SSE bytes are re-parsed here even though EventStream already +parsed them during iteration. +TracedResponse sits below EventStream and can only accumulate raw bytes; it +has no access to the decoded events. Hooking into EventStream could eliminate +this double-parse, but EventStream is Speakeasy-generated code. +""" + +from typing import Any + +from mistralai.client.models import CompletionChunk, UsageInfo + + +def parse_sse_chunks(raw_sse_bytes: bytes) -> list[CompletionChunk]: + """Parse raw SSE bytes into a list of typed CompletionChunk models. + + Only CompletionChunk is handled. If new SSE-streamed response types + are added, parsing and typing here will need updating. + """ + chunks: list[CompletionChunk] = [] + text = raw_sse_bytes.decode("utf-8", errors="replace") + for line in text.split("\n"): + line = line.strip() + if not line.startswith("data: "): + continue + payload = line[6:] + if payload == "[DONE]": + continue + try: + chunks.append(CompletionChunk.model_validate_json(payload)) + except Exception: + continue + return chunks + + +def accumulate_chunks_to_response_dict( + chunks: list[CompletionChunk], +) -> dict[str, Any]: + """Accumulate streaming CompletionChunk deltas into a ChatCompletionResponse-shaped dict.""" + response_id: str | None = None + model: str | None = None + usage: UsageInfo | None = None + choices: dict[int, dict[str, Any]] = {} + + for chunk in chunks: + response_id = response_id or chunk.id + model = model or chunk.model + usage = usage or chunk.usage + + for choice in chunk.choices: + accumulated = choices.setdefault( + choice.index, + { + "message": {"role": "assistant", "content": ""}, + "finish_reason": "", + }, + ) + msg = accumulated["message"] + delta = choice.delta + if isinstance(delta.role, str): + msg["role"] = delta.role + if isinstance(delta.content, str) and delta.content: + msg["content"] += delta.content + if isinstance(choice.finish_reason, str): + accumulated["finish_reason"] = choice.finish_reason + if isinstance(delta.tool_calls, list): + tc_list = msg.setdefault("tool_calls", []) + for tc in delta.tool_calls: + tc_idx = tc.index if tc.index is not None else len(tc_list) + while len(tc_list) <= tc_idx: + tc_list.append( + {"id": None, "function": {"name": "", "arguments": ""}} + ) + # ToolCall.id defaults to the string "null" (Speakeasy codegen quirk) + if tc.id is not None and tc.id != "null": + tc_list[tc_idx]["id"] = tc.id + if tc.function.name: + tc_list[tc_idx]["function"]["name"] += tc.function.name + if isinstance(tc.function.arguments, str) and tc.function.arguments: + tc_list[tc_idx]["function"]["arguments"] += ( + tc.function.arguments + ) + + result: dict[str, Any] = { + "id": response_id, + "model": model, + "choices": [choices[idx] for idx in sorted(choices)], + } + if usage is not None: + result["usage"] = usage.model_dump(mode="json", by_alias=True) + return result diff --git a/src/mistralai/extra/run/tools.py b/src/mistralai/extra/run/tools.py index 18c1d3dd..95dc21a9 100644 --- a/src/mistralai/extra/run/tools.py +++ b/src/mistralai/extra/run/tools.py @@ -8,26 +8,28 @@ import opentelemetry.semconv._incubating.attributes.gen_ai_attributes as gen_ai_attributes from griffe import ( Docstring, - DocstringSectionKind, - DocstringSectionText, DocstringParameter, DocstringSection, + DocstringSectionKind, + DocstringSectionText, ) from opentelemetry import trace +from opentelemetry.trace import Status, StatusCode from pydantic import Field, create_model from pydantic.fields import FieldInfo -from mistralai.extra.exceptions import RunException -from mistralai.extra.mcp.base import MCPClientProtocol -from mistralai.extra.observability.otel import GenAISpanEnum, MistralAIAttributes, set_available_attributes -from mistralai.extra.run.result import RunOutputEntries from mistralai.client.models import ( - FunctionResultEntry, - FunctionTool, Function, FunctionCallEntry, + FunctionResultEntry, + FunctionTool, ) - +from mistralai.extra.exceptions import RunException +from mistralai.extra.mcp.base import MCPClientProtocol +from mistralai.extra.observability.otel import ( + set_available_attributes, +) +from mistralai.extra.run.result import RunOutputEntries logger = logging.getLogger(__name__) @@ -193,22 +195,35 @@ async def create_function_result( else function_call.arguments ) tracer = trace.get_tracer(__name__) - with tracer.start_as_current_span(GenAISpanEnum.function_call(function_call.name)) as span: + with tracer.start_as_current_span( + f"{gen_ai_attributes.GenAiOperationNameValues.EXECUTE_TOOL.value} {function_call.name}" + ) as span: + # Always record identity attributes so the span is useful even on error + function_call_attributes = { + gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_attributes.GenAiOperationNameValues.EXECUTE_TOOL.value, + gen_ai_attributes.GEN_AI_PROVIDER_NAME: gen_ai_attributes.GenAiProviderNameValues.MISTRAL_AI.value, + gen_ai_attributes.GEN_AI_TOOL_CALL_ID: function_call.id, + gen_ai_attributes.GEN_AI_TOOL_CALL_ARGUMENTS: function_call.arguments + if isinstance(function_call.arguments, str) + else json.dumps(function_call.arguments), + gen_ai_attributes.GEN_AI_TOOL_NAME: function_call.name, + gen_ai_attributes.GEN_AI_TOOL_TYPE: "function", + } + set_available_attributes(span, function_call_attributes) try: if isinstance(run_tool, RunFunction): res = run_tool.callable(**arguments) elif isinstance(run_tool, RunCoroutine): res = await run_tool.awaitable(**arguments) elif isinstance(run_tool, RunMCPTool): - res = await run_tool.mcp_client.execute_tool(function_call.name, arguments) - function_call_attributes = { - gen_ai_attributes.GEN_AI_OPERATION_NAME: gen_ai_attributes.GenAiOperationNameValues.EXECUTE_TOOL.value, - gen_ai_attributes.GEN_AI_TOOL_CALL_ID: function_call.id, - MistralAIAttributes.MISTRAL_AI_TOOL_CALL_ARGUMENTS: str(function_call.arguments), - gen_ai_attributes.GEN_AI_TOOL_NAME: function_call.name - } - set_available_attributes(span, function_call_attributes) + res = await run_tool.mcp_client.execute_tool( + function_call.name, arguments + ) + result_str = res if isinstance(res, str) else json.dumps(res) + span.set_attribute(gen_ai_attributes.GEN_AI_TOOL_CALL_RESULT, result_str) except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) if continue_on_fn_error is True: return FunctionResultEntry( tool_call_id=function_call.tool_call_id, @@ -219,8 +234,7 @@ async def create_function_result( ) from e return FunctionResultEntry( - tool_call_id=function_call.tool_call_id, - result=res if isinstance(res, str) else json.dumps(res), + tool_call_id=function_call.tool_call_id, result=result_str ) diff --git a/src/mistralai/extra/tests/test_otel_tracing.py b/src/mistralai/extra/tests/test_otel_tracing.py new file mode 100644 index 00000000..1c78e5b1 --- /dev/null +++ b/src/mistralai/extra/tests/test_otel_tracing.py @@ -0,0 +1,1529 @@ +"""Tests for OTEL tracing instrumentation. + +Each test drives the real TracingHook lifecycle (before_request → after_success) +with realistic Mistral API payloads and verifies the resulting OTEL span attributes +match GenAI semantic conventions. + +Fixtures are defined inline using SDK model classes so each test is self-contained. +""" + +# pyright: reportOptionalSubscript=false +# pyright: reportOptionalMemberAccess=false +# pyright: reportArgumentType=false + +import asyncio +import json +import unittest +from datetime import datetime, timezone +from unittest.mock import MagicMock + +import httpx +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.trace import StatusCode + +from mistralai.client._hooks.tracing import TracingHook +from mistralai.client._hooks.types import ( + AfterErrorContext, + AfterSuccessContext, + BeforeRequestContext, + HookContext, +) +from mistralai.client.models import ( + Agent, + AgentsCompletionRequest, + AssistantMessage, + ChatCompletionChoice, + ChatCompletionRequest, + ChatCompletionResponse, + CompletionChunk, + CompletionEvent, + CompletionResponseStreamChoice, + ConversationAppendRequest, + ConversationRequest, + ConversationResponse, + ConversationUsageInfo, + CreateAgentRequest, + DeltaMessage, + EmbeddingRequest, + EmbeddingResponse, + EmbeddingResponseData, + Function, + FunctionCall, + FunctionCallEntry, + FunctionResultEntry, + FunctionTool, + ImageURL, + ImageURLChunk, + MessageOutputEntry, + SystemMessage, + TextChunk, + ThinkChunk, + Tool, + ToolCall, + ToolExecutionEntry, + ToolMessage, + UsageInfo, + UserMessage, +) +from mistralai.extra.observability.otel import TracedResponse +from mistralai.extra.run.tools import ( + RunFunction, + create_function_result, +) + +# Set up a single TracerProvider for the entire test module. +# trace.set_tracer_provider() can only be called once per process. +_EXPORTER = InMemorySpanExporter() +_PROVIDER = TracerProvider() +_PROVIDER.add_span_processor(SimpleSpanProcessor(_EXPORTER)) +trace.set_tracer_provider(_PROVIDER) + + +# -- Helpers ------------------------------------------------------------------- + + +def _make_httpx_request( + body: dict, + method: str = "POST", + url: str = "https://api.mistral.ai/v1/chat/completions", +) -> httpx.Request: + return httpx.Request( + method=method, + url=url, + content=json.dumps(body).encode(), + headers={"host": "api.mistral.ai", "content-type": "application/json"}, + ) + + +def _make_httpx_response(body: dict, status_code: int = 200) -> httpx.Response: + resp = httpx.Response( + status_code=status_code, + content=json.dumps(body).encode(), + ) + # Mark the response as closed/consumed so it's treated as non-streaming + resp.stream = httpx.ByteStream(resp.content) + resp.stream.close() + return resp + + +def _make_hook_context(operation_id: str) -> HookContext: + return HookContext( + config=MagicMock(), + base_url="https://api.mistral.ai", + operation_id=operation_id, + oauth2_scopes=None, + security_source=None, + ) + + +def _dump(model) -> dict: + """Serialize an SDK model to a JSON-compatible dict, matching wire format.""" + return model.model_dump(mode="json", by_alias=True) + + +def _build_sse_body(events: list[CompletionEvent]) -> bytes: + """Serialize a list of CompletionEvent models into an SSE byte payload.""" + lines = [f"data: {json.dumps(_dump(e.data))}" for e in events] + lines.append("data: [DONE]") + return ("\n\n".join(lines) + "\n\n").encode() + + +def _make_streaming_httpx_response(sse_body: bytes) -> httpx.Response: + """Create an *open* httpx.Response that simulates a streaming SSE response.""" + return httpx.Response( + status_code=200, + stream=httpx.ByteStream(sse_body), + ) + + +def _parse_json_list(span_attr): + """Parse a span attribute containing a list of JSON-encoded strings.""" + return [json.loads(m) for m in span_attr] + + +# -- Tests --------------------------------------------------------------------- + + +class TestOtelTracing(unittest.TestCase): + def setUp(self): + _EXPORTER.clear() + + # -- Test helpers ---------------------------------------------------------- + + def _run_hook_lifecycle( + self, + operation_id: str, + request_body, + response_body, + streaming: bool = False, + ): + """Drive the real TracingHook: before_request → after_success. + + ``request_body`` and ``response_body`` can be SDK model instances or + plain dicts. Models are serialised via ``_dump()`` automatically. + + When ``streaming=True``, ``response_body`` must be a + ``list[CompletionEvent]``. The helper builds an SSE byte payload, + creates an open streaming response, and consumes + closes the stream + so the span is finalised before returning. + """ + hook = TracingHook() + hook_ctx = _make_hook_context(operation_id) + + req_dict = ( + _dump(request_body) if hasattr(request_body, "model_dump") else request_body + ) + + request = _make_httpx_request(req_dict) + + if streaming: + sse_body = _build_sse_body(response_body) + response = _make_streaming_httpx_response(sse_body) + else: + resp_dict = ( + _dump(response_body) + if hasattr(response_body, "model_dump") + else response_body + ) + response = _make_httpx_response(resp_dict) + + hooked_request = hook.before_request(BeforeRequestContext(hook_ctx), request) + self.assertNotIsInstance(hooked_request, Exception) + assert isinstance(hooked_request, httpx.Request) + + result = hook.after_success(AfterSuccessContext(hook_ctx), response) + self.assertNotIsInstance(result, Exception) + + if streaming: + self.assertIsInstance(result, TracedResponse) + assert isinstance(result, TracedResponse) + for _chunk in result.iter_bytes(): + pass + result.close() + + def _run_hook_error_lifecycle( + self, + operation_id: str, + request_body, + response_body: dict, + status_code: int = 400, + error: Exception | None = None, + ): + """Drive the real TracingHook: before_request → after_error.""" + hook = TracingHook() + hook_ctx = _make_hook_context(operation_id) + + req_dict = ( + _dump(request_body) if hasattr(request_body, "model_dump") else request_body + ) + request = _make_httpx_request(req_dict) + response = _make_httpx_response(response_body, status_code=status_code) + + hooked_request = hook.before_request(BeforeRequestContext(hook_ctx), request) + self.assertNotIsInstance(hooked_request, Exception) + assert isinstance(hooked_request, httpx.Request) + + result = hook.after_error(AfterErrorContext(hook_ctx), response, error) + self.assertNotIsInstance(result, Exception) + + def _get_finished_spans(self): + return _EXPORTER.get_finished_spans() + + def _get_single_span(self): + spans = self._get_finished_spans() + self.assertEqual(len(spans), 1, f"Expected 1 span, got {len(spans)}") + return spans[0] + + def assertSpanAttributes(self, span, expected: dict): + """Assert that *expected* is a subset of *span.attributes*.""" + actual = {k: span.attributes[k] for k in expected} + self.assertEqual(expected, actual) + + # -- Simple chat completion ------------------------------------------------ + + def test_simple_chat_completion(self): + request = ChatCompletionRequest( + model="mistral-large-latest", + temperature=0.7, + top_p=1, + max_tokens=512, + messages=[ + SystemMessage(content="You are a helpful assistant."), + UserMessage(content="What is the best French cheese?"), + ], + ) + response = ChatCompletionResponse( + id="cmpl-a1b2c3d4e5f6", + object="chat.completion", + model="mistral-large-latest", + created=1700000000, + choices=[ + ChatCompletionChoice( + index=0, + message=AssistantMessage( + content="There are many great French cheeses! Camembert, Roquefort, and Brie are among the most celebrated.", + tool_calls=None, + ), + finish_reason="stop", + ), + ], + usage=UsageInfo(prompt_tokens=20, completion_tokens=25, total_tokens=45), + ) + + self._run_hook_lifecycle( + "chat_completion_v1_chat_completions_post", + request, + response, + ) + span = self._get_single_span() + + self.assertEqual(span.name, "chat mistral-large-latest") + self.assertSpanAttributes( + span, + { + "gen_ai.operation.name": "chat", + "gen_ai.provider.name": "mistral_ai", + "gen_ai.request.model": "mistral-large-latest", + "gen_ai.request.temperature": 0.7, + "gen_ai.request.top_p": 1, + "gen_ai.request.max_tokens": 512, + "http.request.method": "POST", + "server.address": "api.mistral.ai", + "server.port": 443, + "http.response.status_code": 200, + "gen_ai.response.id": "cmpl-a1b2c3d4e5f6", + "gen_ai.response.model": "mistral-large-latest", + "gen_ai.response.finish_reasons": ("stop",), + "gen_ai.usage.input_tokens": 20, + "gen_ai.usage.output_tokens": 25, + }, + ) + + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.input.messages"]), + [ + { + "role": "system", + "parts": [ + {"type": "text", "content": "You are a helpful assistant."} + ], + }, + { + "role": "user", + "parts": [ + {"type": "text", "content": "What is the best French cheese?"} + ], + }, + ], + ) + + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.output.messages"]), + [ + { + "role": "assistant", + "parts": [ + { + "type": "text", + "content": "There are many great French cheeses! Camembert, Roquefort, and Brie are among the most celebrated.", + } + ], + "finish_reason": "stop", + }, + ], + ) + + # -- Chat completion with tool calls --------------------------------------- + + def test_chat_completion_with_tool_calls(self): + request = ChatCompletionRequest( + model="mistral-large-latest", + messages=[ + UserMessage(content="What's the weather in Paris?"), + ], + tools=[ + Tool( + type="function", + function=Function( + name="get_weather", + description="Get the current weather in a given location", + parameters={ + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "City name", + }, + }, + "required": ["location"], + }, + ), + ), + ], + tool_choice="auto", + ) + response = ChatCompletionResponse( + id="cmpl-tool-001", + object="chat.completion", + model="mistral-large-latest", + created=1700000001, + choices=[ + ChatCompletionChoice( + index=0, + message=AssistantMessage( + content="", + tool_calls=[ + ToolCall( + id="call_abc123", + function=FunctionCall( + name="get_weather", + arguments='{"location": "Paris"}', + ), + ), + ], + ), + finish_reason="tool_calls", + ), + ], + usage=UsageInfo(prompt_tokens=30, completion_tokens=15, total_tokens=45), + ) + + self._run_hook_lifecycle( + "chat_completion_v1_chat_completions_post", + request, + response, + ) + span = self._get_single_span() + + self.assertEqual(span.name, "chat mistral-large-latest") + self.assertSpanAttributes( + span, + { + "gen_ai.operation.name": "chat", + "gen_ai.response.finish_reasons": ("tool_calls",), + "gen_ai.usage.input_tokens": 30, + "gen_ai.usage.output_tokens": 15, + }, + ) + + # Tool definitions in request + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.tool.definitions"]), + [ + { + "type": "function", + "name": "get_weather", + "description": "Get the current weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "City name"} + }, + "required": ["location"], + }, + }, + ], + ) + + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.input.messages"]), + [ + { + "role": "user", + "parts": [ + {"type": "text", "content": "What's the weather in Paris?"} + ], + }, + ], + ) + + # Output messages — assistant with tool call and empty text content + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.output.messages"]), + [ + { + "role": "assistant", + "parts": [ + {"type": "text", "content": ""}, + { + "type": "tool_call", + "name": "get_weather", + "id": "call_abc123", + "arguments": '{"location": "Paris"}', + }, + ], + "finish_reason": "tool_calls", + }, + ], + ) + + # -- Embeddings ------------------------------------------------------------ + + def test_embeddings(self): + request = EmbeddingRequest( + model="mistral-embed", + inputs=["What is the best French cheese?"], + ) + response = EmbeddingResponse( + id="emb-a1b2c3", + object="list", + model="mistral-embed", + data=[ + EmbeddingResponseData( + object="embedding", embedding=[0.1, 0.2, 0.3], index=0 + ) + ], + usage=UsageInfo(prompt_tokens=10, completion_tokens=0, total_tokens=10), + ) + + self._run_hook_lifecycle( + "embeddings_v1_embeddings_post", + request, + response, + ) + span = self._get_single_span() + + self.assertEqual(span.name, "embeddings mistral-embed") + self.assertSpanAttributes( + span, + { + "gen_ai.operation.name": "embeddings", + "gen_ai.provider.name": "mistral_ai", + "gen_ai.request.model": "mistral-embed", + "gen_ai.response.id": "emb-a1b2c3", + "gen_ai.response.model": "mistral-embed", + "gen_ai.usage.input_tokens": 10, + }, + ) + + # Embeddings have no messages or choices + self.assertNotIn("gen_ai.input.messages", span.attributes) + self.assertNotIn("gen_ai.output.messages", span.attributes) + self.assertNotIn("gen_ai.response.finish_reasons", span.attributes) + + # -- Create agent ---------------------------------------------------------- + + def test_create_agent(self): + request = CreateAgentRequest( + model="mistral-large-latest", + name="my-test-agent", + description="A helpful test agent", + instructions="You are a helpful test assistant. Be concise.", + tools=[ + FunctionTool( + function=Function( + name="get_weather", + description="Get weather", + parameters={ + "type": "object", + "properties": {"location": {"type": "string"}}, + }, + ), + ), + ], + ) + response = Agent( + id="agent-xyz-789", + object="agent", + model="mistral-large-latest", + name="my-test-agent", + version=0, + versions=[], + description="A helpful test agent", + instructions="You are a helpful test assistant. Be concise.", + tools=[], + created_at=datetime(2024, 6, 1, 12, 0, 0, tzinfo=timezone.utc), + updated_at=datetime(2024, 6, 1, 12, 0, 0, tzinfo=timezone.utc), + deployment_chat=False, + source="api", + ) + + self._run_hook_lifecycle( + "agents_api_v1_agents_create", + request, + response, + ) + span = self._get_single_span() + + self.assertEqual(span.name, "create_agent my-test-agent") + self.assertSpanAttributes( + span, + { + "gen_ai.operation.name": "create_agent", + "gen_ai.provider.name": "mistral_ai", + "gen_ai.agent.id": "agent-xyz-789", + "gen_ai.agent.name": "my-test-agent", + "gen_ai.agent.description": "A helpful test agent", + "gen_ai.system_instructions": "You are a helpful test assistant. Be concise.", + "gen_ai.agent.version": "0", + "gen_ai.request.model": "mistral-large-latest", + }, + ) + + # response.id should NOT be set (id means agent id for create_agent) + self.assertNotIn("gen_ai.response.id", span.attributes) + + # -- Agent completion (via /v1/agents/completions) ------------------------- + + def test_agent_completion(self): + request = AgentsCompletionRequest( + agent_id="agent-xyz-789", + messages=[ + UserMessage(content="What's the weather in Paris?"), + ], + max_tokens=1024, + ) + response = ChatCompletionResponse( + id="cmpl-agent-001", + object="chat.completion", + model="mistral-large-latest", + created=1700000002, + choices=[ + ChatCompletionChoice( + index=0, + message=AssistantMessage( + content="It's sunny and 22C in Paris today.", + tool_calls=None, + ), + finish_reason="stop", + ), + ], + usage=UsageInfo(prompt_tokens=40, completion_tokens=12, total_tokens=52), + ) + + self._run_hook_lifecycle( + "agents_completion_v1_agents_completions_post", + request, + response, + ) + span = self._get_single_span() + + # Span name — no agent name in request body, falls back to op name + self.assertEqual(span.name, "invoke_agent") + self.assertSpanAttributes( + span, + { + "gen_ai.operation.name": "invoke_agent", + "gen_ai.provider.name": "mistral_ai", + "gen_ai.response.id": "cmpl-agent-001", + "gen_ai.response.model": "mistral-large-latest", + "gen_ai.response.finish_reasons": ("stop",), + "gen_ai.usage.input_tokens": 40, + "gen_ai.usage.output_tokens": 12, + }, + ) + + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.input.messages"]), + [ + { + "role": "user", + "parts": [ + {"type": "text", "content": "What's the weather in Paris?"} + ], + }, + ], + ) + + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.output.messages"]), + [ + { + "role": "assistant", + "parts": [ + { + "type": "text", + "content": "It's sunny and 22C in Paris today.", + } + ], + "finish_reason": "stop", + }, + ], + ) + + # -- Conversation start (via /v1/conversations) ---------------------------- + + def test_conversation_start(self): + request = ConversationRequest( + agent_id="agent-xyz-789", + inputs="What's the weather in Paris?", + ) + response = ConversationResponse( + conversation_id="conv-001", + object="conversation.response", + usage=ConversationUsageInfo( + prompt_tokens=15, completion_tokens=10, total_tokens=25 + ), + outputs=[ + ToolExecutionEntry( + name="get_weather", + arguments='{"location": "Paris"}', + id="tool-exec-001", + info={"temperature": "22C", "condition": "sunny"}, + created_at=datetime(2024, 6, 1, 12, 0, 0, tzinfo=timezone.utc), + completed_at=datetime(2024, 6, 1, 12, 0, 1, tzinfo=timezone.utc), + ), + MessageOutputEntry( + id="msg-out-001", + role="assistant", + content="It's sunny and 22C in Paris today.", + model="mistral-large-latest", + agent_id="agent-xyz-789", + created_at=datetime(2024, 6, 1, 12, 0, 1, tzinfo=timezone.utc), + completed_at=datetime(2024, 6, 1, 12, 0, 2, tzinfo=timezone.utc), + ), + ], + ) + + self._run_hook_lifecycle( + "agents_api_v1_conversations_start", + request, + response, + ) + spans = self._get_finished_spans() + + # Parent span + 2 child spans (tool execution + message output) + self.assertEqual(len(spans), 3, f"Expected 3 spans, got {len(spans)}") + + # Identify spans by operation name + parent = None + tool_span = None + message_span = None + for s in spans: + op = s.attributes.get("gen_ai.operation.name") + if op == "invoke_agent": + parent = s + elif op == "execute_tool": + tool_span = s + elif op == "chat": + message_span = s + + self.assertIsNotNone(parent, "Missing invoke_agent parent span") + self.assertIsNotNone(tool_span, "Missing execute_tool child span") + self.assertIsNotNone(message_span, "Missing chat child span") + + # Parent span + self.assertSpanAttributes( + parent, + { + "gen_ai.operation.name": "invoke_agent", + "gen_ai.provider.name": "mistral_ai", + "gen_ai.conversation.id": "conv-001", + "gen_ai.usage.input_tokens": 15, + "gen_ai.usage.output_tokens": 10, + }, + ) + + self.assertListEqual( + _parse_json_list(parent.attributes["gen_ai.input.messages"]), + [ + { + "role": "user", + "parts": [ + {"type": "text", "content": "What's the weather in Paris?"} + ], + }, + ], + ) + + # Parent span should NOT have output messages (they belong on child spans) + self.assertNotIn("gen_ai.output.messages", parent.attributes) + + # Tool execution child span + self.assertEqual(tool_span.name, "execute_tool get_weather") + self.assertSpanAttributes( + tool_span, + { + "gen_ai.operation.name": "execute_tool", + "gen_ai.provider.name": "mistral_ai", + "gen_ai.tool.name": "get_weather", + "gen_ai.tool.call.id": "tool-exec-001", + "gen_ai.tool.call.arguments": '{"location": "Paris"}', + "gen_ai.tool.type": "extension", + }, + ) + self.assertEqual( + json.loads(tool_span.attributes["gen_ai.tool.call.result"]), + {"temperature": "22C", "condition": "sunny"}, + ) + self.assertEqual(tool_span.parent.span_id, parent.context.span_id) + + # Message output child span + self.assertEqual(message_span.name, "chat mistral-large-latest") + self.assertSpanAttributes( + message_span, + { + "gen_ai.operation.name": "chat", + "gen_ai.response.id": "msg-out-001", + "gen_ai.agent.id": "agent-xyz-789", + "gen_ai.response.model": "mistral-large-latest", + }, + ) + self.assertEqual(message_span.parent.span_id, parent.context.span_id) + + self.assertListEqual( + _parse_json_list(message_span.attributes["gen_ai.output.messages"]), + [ + { + "role": "assistant", + "parts": [ + { + "type": "text", + "content": "It's sunny and 22C in Paris today.", + } + ], + "finish_reason": "", + }, + ], + ) + + # -- Conversation append --------------------------------------------------- + + def test_conversation_append_with_function_results(self): + """Conversation append with FunctionResultEntry inputs must serialize them as tool messages.""" + request = ConversationAppendRequest( + inputs=[ + FunctionResultEntry( + tool_call_id="tc-001", + result='{"status": "Completed"}', + ), + FunctionResultEntry( + tool_call_id="tc-002", + result='{"date": "2021-10-05"}', + ), + ], + ) + response = ConversationResponse( + conversation_id="conv-001", + object="conversation.response", + usage=ConversationUsageInfo( + prompt_tokens=20, completion_tokens=15, total_tokens=35 + ), + outputs=[ + MessageOutputEntry( + id="msg-out-002", + role="assistant", + content="Transaction T1001 was completed on 2021-10-05.", + model="mistral-large-latest", + agent_id="agent-xyz-789", + created_at=datetime(2024, 6, 1, 12, 1, 0, tzinfo=timezone.utc), + completed_at=datetime(2024, 6, 1, 12, 1, 1, tzinfo=timezone.utc), + ), + ], + ) + + self._run_hook_lifecycle( + "agents_api_v1_conversations_append", + _dump(request), + _dump(response), + ) + spans = self._get_finished_spans() + + # Parent span + 1 child span (message output) + self.assertEqual(len(spans), 2, f"Expected 2 spans, got {len(spans)}") + + parent = None + message_span = None + for s in spans: + op = s.attributes.get("gen_ai.operation.name") + if op == "invoke_agent" and s.parent is None: + parent = s + elif op == "chat": + message_span = s + + self.assertIsNotNone(parent, "Missing invoke_agent parent span") + self.assertIsNotNone(message_span, "Missing chat child span") + + # Parent span — input messages must contain the function results + self.assertListEqual( + _parse_json_list(parent.attributes["gen_ai.input.messages"]), + [ + { + "role": "tool", + "parts": [ + { + "type": "tool_call_response", + "response": '{"status": "Completed"}', + "id": "tc-001", + }, + ], + }, + { + "role": "tool", + "parts": [ + { + "type": "tool_call_response", + "response": '{"date": "2021-10-05"}', + "id": "tc-002", + }, + ], + }, + ], + ) + + # -- Non-GenAI operation --------------------------------------------------- + + def test_non_genai_operation(self): + self._run_hook_lifecycle( + "files_api_routes_upload_file", + {"file": "data"}, + {"id": "file-123", "object": "file"}, + ) + span = self._get_single_span() + self.assertNotIn("gen_ai.operation.name", span.attributes) + self.assertNotIn("gen_ai.provider.name", span.attributes) + self.assertEqual(span.attributes["http.request.method"], "POST") + + # -- Multi-turn tool use --------------------------------------------------- + + def test_multi_turn_tool_use(self): + """Full tool-use loop: user → assistant(tool_calls) → tool(result) → assistant(final). + + Tests that all message roles are serialised correctly in + gen_ai.input.messages, including the tool_call_response part for + role="tool" and the tool_call parts for role="assistant". + """ + request = ChatCompletionRequest( + model="mistral-small-latest", + max_tokens=64, + messages=[ + UserMessage(content="What is the weather in Paris?"), + AssistantMessage( + content="", + tool_calls=[ + ToolCall( + id="7SXIeh1Ie", + function=FunctionCall( + name="get_weather", + arguments='{"location": "Paris"}', + ), + ), + ], + ), + ToolMessage( + name="get_weather", + content="22C, sunny", + tool_call_id="7SXIeh1Ie", + ), + ], + tools=[ + Tool( + type="function", + function=Function( + name="get_weather", + description="Get the current weather in a given location", + parameters={ + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "City name", + }, + }, + "required": ["location"], + }, + ), + ), + ], + ) + response = ChatCompletionResponse( + id="cmpl-multiturn-001", + object="chat.completion", + model="mistral-small-latest", + created=1700000003, + choices=[ + ChatCompletionChoice( + index=0, + message=AssistantMessage( + content="The weather in Paris is currently 22°C and sunny.", + tool_calls=None, + ), + finish_reason="stop", + ), + ], + usage=UsageInfo(prompt_tokens=115, completion_tokens=14, total_tokens=129), + ) + + self._run_hook_lifecycle( + "chat_completion_v1_chat_completions_post", + request, + response, + ) + span = self._get_single_span() + + self.assertEqual(span.name, "chat mistral-small-latest") + self.assertSpanAttributes( + span, + { + "gen_ai.usage.input_tokens": 115, + "gen_ai.usage.output_tokens": 14, + }, + ) + + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.input.messages"]), + [ + { + "role": "user", + "parts": [ + {"type": "text", "content": "What is the weather in Paris?"} + ], + }, + { + "role": "assistant", + "parts": [ + {"type": "text", "content": ""}, + { + "type": "tool_call", + "name": "get_weather", + "id": "7SXIeh1Ie", + "arguments": '{"location": "Paris"}', + }, + ], + }, + { + "role": "tool", + "parts": [ + { + "type": "tool_call_response", + "response": "22C, sunny", + "id": "7SXIeh1Ie", + }, + ], + }, + ], + ) + + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.output.messages"]), + [ + { + "role": "assistant", + "parts": [ + { + "type": "text", + "content": "The weather in Paris is currently 22°C and sunny.", + } + ], + "finish_reason": "stop", + }, + ], + ) + + # Tool definitions + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.tool.definitions"]), + [ + { + "type": "function", + "name": "get_weather", + "description": "Get the current weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "City name"} + }, + "required": ["location"], + }, + }, + ], + ) + + # -- Content chunks (multimodal) ------------------------------------------- + + def test_content_chunks_text_and_image(self): + """Request with content as array of chunks (text + image_url).""" + request = ChatCompletionRequest( + model="mistral-small-latest", + max_tokens=64, + messages=[ + UserMessage( + content=[ + TextChunk(text="Describe this image briefly"), + ImageURLChunk( + image_url=ImageURL( + url="https://example.com/image.jpg", + ), + ), + ], + ), + ], + ) + response = ChatCompletionResponse( + id="cmpl-vision-001", + object="chat.completion", + model="mistral-small-latest", + created=1700000004, + choices=[ + ChatCompletionChoice( + index=0, + message=AssistantMessage( + content="The image shows a landscape.", + tool_calls=None, + ), + finish_reason="stop", + ), + ], + usage=UsageInfo(prompt_tokens=96, completion_tokens=8, total_tokens=104), + ) + + self._run_hook_lifecycle( + "chat_completion_v1_chat_completions_post", + request, + response, + ) + span = self._get_single_span() + + self.assertEqual(span.name, "chat mistral-small-latest") + + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.input.messages"]), + [ + { + "role": "user", + "parts": [ + {"type": "text", "content": "Describe this image briefly"}, + { + "type": "uri", + "modality": "image", + "uri": "https://example.com/image.jpg", + }, + ], + }, + ], + ) + + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.output.messages"]), + [ + { + "role": "assistant", + "parts": [ + {"type": "text", "content": "The image shows a landscape."} + ], + "finish_reason": "stop", + }, + ], + ) + + def test_content_chunks_thinking(self): + """Response with thinking content chunk. + + Tests the "thinking" → "reasoning" mapping in _content_to_parts. + """ + request = ChatCompletionRequest( + model="magistral-small-latest", + messages=[ + UserMessage(content="What is 15 * 37?"), + ], + ) + response = ChatCompletionResponse( + id="cmpl-think-001", + object="chat.completion", + model="magistral-small-latest", + created=1700000006, + choices=[ + ChatCompletionChoice( + index=0, + message=AssistantMessage( + content=[ + ThinkChunk( + thinking=[ + TextChunk( + text="Let me calculate: 15 * 37 = 15 * 30 + 15 * 7 = 450 + 105 = 555" + ), + ], + ), + TextChunk(text="15 * 37 = 555"), + ], + ), + finish_reason="stop", + ), + ], + usage=UsageInfo(prompt_tokens=10, completion_tokens=30, total_tokens=40), + ) + + self._run_hook_lifecycle( + "chat_completion_v1_chat_completions_post", + request, + response, + ) + span = self._get_single_span() + + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.input.messages"]), + [ + { + "role": "user", + "parts": [{"type": "text", "content": "What is 15 * 37?"}], + }, + ], + ) + + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.output.messages"]), + [ + { + "role": "assistant", + "parts": [ + { + "type": "reasoning", + "content": "Let me calculate: 15 * 37 = 15 * 30 + 15 * 7 = 450 + 105 = 555", + }, + {"type": "text", "content": "15 * 37 = 555"}, + ], + "finish_reason": "stop", + }, + ], + ) + + # -- Multiple choices (n > 1) ---------------------------------------------- + + def test_multiple_choices(self): + """Response with multiple choices (n=2).""" + request = ChatCompletionRequest( + model="mistral-small-latest", + n=2, + max_tokens=32, + messages=[ + UserMessage(content="Tell me a joke"), + ], + ) + response = ChatCompletionResponse( + id="cmpl-multi-001", + object="chat.completion", + model="mistral-small-latest", + created=1700000005, + choices=[ + ChatCompletionChoice( + index=0, + message=AssistantMessage( + content="Why did the chicken cross the road?", + tool_calls=None, + ), + finish_reason="stop", + ), + ChatCompletionChoice( + index=1, + message=AssistantMessage( + content="A programmer walks into a bar...", + tool_calls=None, + ), + finish_reason="stop", + ), + ], + usage=UsageInfo(prompt_tokens=10, completion_tokens=20, total_tokens=30), + ) + + self._run_hook_lifecycle( + "chat_completion_v1_chat_completions_post", + request, + response, + ) + span = self._get_single_span() + + self.assertEqual( + span.attributes["gen_ai.response.finish_reasons"], ("stop", "stop") + ) + + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.input.messages"]), + [ + { + "role": "user", + "parts": [{"type": "text", "content": "Tell me a joke"}], + }, + ], + ) + + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.output.messages"]), + [ + { + "role": "assistant", + "parts": [ + { + "type": "text", + "content": "Why did the chicken cross the road?", + } + ], + "finish_reason": "stop", + }, + { + "role": "assistant", + "parts": [ + { + "type": "text", + "content": "A programmer walks into a bar...", + } + ], + "finish_reason": "stop", + }, + ], + ) + + # -- Error response -------------------------------------------------------- + + def test_error_response(self): + """API error response (object="error") via after_error hook.""" + + request = ChatCompletionRequest( + model="mistral-large-latest", + temperature=0.7, + top_p=1, + max_tokens=512, + messages=[ + SystemMessage(content="You are a helpful assistant."), + UserMessage(content="What is the best French cheese?"), + ], + ) + error_body = { + "object": "error", + "message": "Invalid model: nonexistent-model", + "type": "invalid_model", + "param": None, + "code": "1500", + } + + self._run_hook_error_lifecycle( + "chat_completion_v1_chat_completions_post", + request, + error_body, + status_code=400, + error=Exception("Bad Request"), + ) + span = self._get_single_span() + + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertEqual(span.status.description, "Invalid model: nonexistent-model") + self.assertSpanAttributes( + span, + { + "error.type": "invalid_model", + "mistral_ai.error.code": "1500", + "http.response.status_code": 400, + }, + ) + + # Exception event per OTEL exception semantic conventions + exc_events = [e for e in span.events if e.name == "exception"] + self.assertEqual( + len(exc_events), 2 + ) # one from record_exception, one from API error body + api_error_event = exc_events[1] + self.assertEqual(api_error_event.attributes["exception.type"], "invalid_model") + self.assertEqual( + api_error_event.attributes["exception.message"], + "Invalid model: nonexistent-model", + ) + + # -- Streaming response ---------------------------------------------------- + + def test_streaming_chat_completion_enriches_span(self): + """Streaming responses must set the same response attributes as non-streaming. + + Simulates a realistic SSE stream with multiple CompletionEvent chunks: + - chunk 1: role + first content delta + - chunk 2: more content + - chunk 3: finish_reason + usage + - sentinel: [DONE] + + After consuming the stream and closing, the span must contain + gen_ai.response.id, gen_ai.response.model, gen_ai.usage.*, + gen_ai.response.finish_reasons, and gen_ai.output.messages. + """ + request = ChatCompletionRequest( + model="mistral-large-latest", + temperature=0.7, + max_tokens=512, + messages=[ + SystemMessage(content="You are a helpful assistant."), + UserMessage(content="What is the best French cheese?"), + ], + ) + response_events = [ + CompletionEvent( + data=CompletionChunk( + id="cmpl-stream-001", + model="mistral-large-latest", + object="chat.completion.chunk", + created=1700000000, + choices=[ + CompletionResponseStreamChoice( + index=0, + delta=DeltaMessage(role="assistant", content="Camembert"), + finish_reason=None, + ), + ], + ), + ), + CompletionEvent( + data=CompletionChunk( + id="cmpl-stream-001", + model="mistral-large-latest", + object="chat.completion.chunk", + created=1700000000, + choices=[ + CompletionResponseStreamChoice( + index=0, + delta=DeltaMessage(content=" is a classic choice."), + finish_reason=None, + ), + ], + ), + ), + CompletionEvent( + data=CompletionChunk( + id="cmpl-stream-001", + model="mistral-large-latest", + object="chat.completion.chunk", + created=1700000000, + choices=[ + CompletionResponseStreamChoice( + index=0, + delta=DeltaMessage(content=""), + finish_reason="stop", + ), + ], + usage=UsageInfo( + prompt_tokens=20, completion_tokens=8, total_tokens=28 + ), + ), + ), + ] + + self._run_hook_lifecycle( + "chat_completion_v1_chat_completions_post", + request, + response_events, + streaming=True, + ) + span = self._get_single_span() + + # Request-side attributes + self.assertEqual(span.name, "chat mistral-large-latest") + self.assertSpanAttributes( + span, + { + "gen_ai.operation.name": "chat", + "gen_ai.provider.name": "mistral_ai", + "gen_ai.request.model": "mistral-large-latest", + "gen_ai.request.temperature": 0.7, + "gen_ai.request.max_tokens": 512, + "gen_ai.response.id": "cmpl-stream-001", + "gen_ai.response.model": "mistral-large-latest", + "gen_ai.usage.input_tokens": 20, + "gen_ai.usage.output_tokens": 8, + "gen_ai.response.finish_reasons": ("stop",), + }, + ) + + # Output messages — accumulated from deltas + self.assertListEqual( + _parse_json_list(span.attributes["gen_ai.output.messages"]), + [ + { + "role": "assistant", + "parts": [ + { + "type": "text", + "content": "Camembert is a classic choice.", + } + ], + "finish_reason": "stop", + }, + ], + ) + + # -- create_function_result (client-side tool execution) ------------------- + + def test_create_function_result_span_attributes(self): + """create_function_result must emit an execute_tool span with all GenAI attributes.""" + + def get_weather(location: str) -> dict: + return {"temperature": "22C", "condition": "sunny"} + + function_call = FunctionCallEntry( + tool_call_id="tc-001", + name="get_weather", + arguments='{"location": "Paris"}', + id="fc-001", + ) + run_tool = RunFunction( + name="get_weather", + callable=get_weather, + tool=FunctionTool(function=Function(name="get_weather", parameters={})), + ) + + result = asyncio.get_event_loop().run_until_complete( + create_function_result(function_call, run_tool) + ) + self.assertEqual(result.tool_call_id, "tc-001") + + span = self._get_single_span() + + self.assertEqual(span.name, "execute_tool get_weather") + self.assertSpanAttributes( + span, + { + "gen_ai.operation.name": "execute_tool", + "gen_ai.provider.name": "mistral_ai", + "gen_ai.tool.name": "get_weather", + "gen_ai.tool.call.id": "fc-001", + "gen_ai.tool.call.arguments": '{"location": "Paris"}', + "gen_ai.tool.type": "function", + }, + ) + self.assertEqual( + json.loads(span.attributes["gen_ai.tool.call.result"]), + {"temperature": "22C", "condition": "sunny"}, + ) + + def test_create_function_result_error_span(self): + """When the tool raises, the span must record the error and retain identity attributes.""" + + def failing_tool(x: int) -> str: + raise ValueError("boom") + + function_call = FunctionCallEntry( + tool_call_id="tc-err", + name="failing_tool", + arguments='{"x": 1}', + id="fc-err", + ) + run_tool = RunFunction( + name="failing_tool", + callable=failing_tool, + tool=FunctionTool(function=Function(name="failing_tool", parameters={})), + ) + + asyncio.get_event_loop().run_until_complete( + create_function_result(function_call, run_tool, continue_on_fn_error=True) + ) + + span = self._get_single_span() + + self.assertSpanAttributes( + span, + { + "gen_ai.operation.name": "execute_tool", + "gen_ai.tool.name": "failing_tool", + "gen_ai.tool.call.id": "fc-err", + }, + ) + # Result should NOT be present (tool didn't succeed) + self.assertNotIn("gen_ai.tool.call.result", span.attributes) + # Error status must be recorded + self.assertEqual(span.status.status_code, StatusCode.ERROR) + # Exception event must be recorded + self.assertTrue( + any(e.name == "exception" for e in span.events), + "Expected an exception event on the span", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/mistralai/extra/tests/test_serialization.py b/src/mistralai/extra/tests/test_serialization.py new file mode 100644 index 00000000..3c88aa71 --- /dev/null +++ b/src/mistralai/extra/tests/test_serialization.py @@ -0,0 +1,447 @@ +"""Unit tests for the OTEL serialization helpers. + +Each test covers a single function with both happy-path and edge-case inputs. +The functions are pure (dict → str/list), so no OTEL setup is needed. +""" + +import json +import unittest + +from mistralai.extra.observability.serialization import ( + _content_to_parts, + _tool_calls_to_parts, + serialize_input_message, + serialize_output_message, + serialize_tool_definition, +) + + +def _parse(json_str: str): + """Shorthand: parse a JSON string returned by a serialize_* function.""" + return json.loads(json_str) + + +class TestContentToParts(unittest.TestCase): + def test_none(self): + self.assertEqual(_content_to_parts(None), []) + + def test_string(self): + self.assertEqual( + _content_to_parts("hello"), + [{"type": "text", "content": "hello"}], + ) + + def test_empty_string(self): + self.assertEqual( + _content_to_parts(""), + [{"type": "text", "content": ""}], + ) + + def test_empty_list(self): + self.assertEqual(_content_to_parts([]), []) + + def test_list_of_strings(self): + self.assertEqual( + _content_to_parts(["a", "b"]), + [ + {"type": "text", "content": "a"}, + {"type": "text", "content": "b"}, + ], + ) + + def test_text_chunk(self): + self.assertEqual( + _content_to_parts([{"type": "text", "text": "hello"}]), + [{"type": "text", "content": "hello"}], + ) + + def test_text_chunk_missing_text_field(self): + self.assertEqual( + _content_to_parts([{"type": "text"}]), + [{"type": "text", "content": ""}], + ) + + # -- thinking chunks ------------------------------------------------------- + + def test_thinking_chunk_with_sub_chunks(self): + chunk = { + "type": "thinking", + "thinking": [ + {"type": "text", "text": "step 1"}, + {"type": "text", "text": "step 2"}, + ], + } + self.assertEqual( + _content_to_parts([chunk]), + [{"type": "reasoning", "content": "step 1\nstep 2"}], + ) + + def test_thinking_chunk_filters_non_text_sub_chunks(self): + chunk = { + "type": "thinking", + "thinking": [ + {"type": "text", "text": "kept"}, + {"type": "other", "text": "ignored"}, + "also ignored", + ], + } + self.assertEqual( + _content_to_parts([chunk]), + [{"type": "reasoning", "content": "kept"}], + ) + + def test_thinking_chunk_fallback_plain_string(self): + chunk = {"type": "thinking", "thinking": "raw thought"} + self.assertEqual( + _content_to_parts([chunk]), + [{"type": "reasoning", "content": "raw thought"}], + ) + + def test_thinking_chunk_missing_thinking_field(self): + """Empty string default → str("") fallback.""" + chunk = {"type": "thinking"} + self.assertEqual( + _content_to_parts([chunk]), + [{"type": "reasoning", "content": ""}], + ) + + # -- image_url chunks ------------------------------------------------------ + + def test_image_url_chunk_dict(self): + chunk = {"type": "image_url", "image_url": {"url": "https://img.png"}} + self.assertEqual( + _content_to_parts([chunk]), + [{"type": "uri", "modality": "image", "uri": "https://img.png"}], + ) + + def test_image_url_chunk_string_fallback(self): + chunk = {"type": "image_url", "image_url": "https://img.png"} + self.assertEqual( + _content_to_parts([chunk]), + [{"type": "uri", "modality": "image", "uri": "https://img.png"}], + ) + + def test_image_url_chunk_missing_url(self): + chunk = {"type": "image_url", "image_url": {}} + self.assertEqual( + _content_to_parts([chunk]), + [{"type": "uri", "modality": "image", "uri": ""}], + ) + + # -- unknown / catch-all --------------------------------------------------- + + def test_unknown_chunk_type(self): + chunk = {"type": "audio", "data": "..."} + self.assertEqual( + _content_to_parts([chunk]), + [{"type": "audio"}], + ) + + def test_mixed_chunk_types(self): + """Multiple chunk types in one content array.""" + parts = _content_to_parts( + [ + {"type": "text", "text": "look at this"}, + {"type": "image_url", "image_url": {"url": "https://img.png"}}, + "plain string", + ] + ) + self.assertListEqual( + parts, + [ + {"type": "text", "content": "look at this"}, + {"type": "uri", "modality": "image", "uri": "https://img.png"}, + {"type": "text", "content": "plain string"}, + ], + ) + + +class TestToolCallsToParts(unittest.TestCase): + def test_none(self): + self.assertEqual(_tool_calls_to_parts(None), []) + + def test_empty_list(self): + self.assertEqual(_tool_calls_to_parts([]), []) + + def test_full_tool_call(self): + tc = { + "id": "call_123", + "function": {"name": "get_weather", "arguments": '{"city": "Paris"}'}, + } + self.assertEqual( + _tool_calls_to_parts([tc]), + [ + { + "type": "tool_call", + "name": "get_weather", + "id": "call_123", + "arguments": '{"city": "Paris"}', + }, + ], + ) + + def test_missing_id(self): + tc = {"function": {"name": "f"}} + self.assertListEqual( + _tool_calls_to_parts([tc]), + [{"type": "tool_call", "name": "f"}], + ) + + def test_missing_arguments(self): + tc = {"id": "1", "function": {"name": "f"}} + self.assertListEqual( + _tool_calls_to_parts([tc]), + [{"type": "tool_call", "name": "f", "id": "1"}], + ) + + def test_missing_function(self): + """No function key → empty name.""" + tc = {"id": "1"} + self.assertListEqual( + _tool_calls_to_parts([tc]), + [{"type": "tool_call", "name": "", "id": "1"}], + ) + + def test_function_is_none(self): + tc = {"id": "1", "function": None} + self.assertListEqual( + _tool_calls_to_parts([tc]), + [{"type": "tool_call", "name": "", "id": "1"}], + ) + + +class TestSerializeInputMessage(unittest.TestCase): + # -- Happy paths (role-based messages) ------------------------------------ + + def test_user_message(self): + result = _parse(serialize_input_message({"role": "user", "content": "hi"})) + self.assertDictEqual( + result, + { + "role": "user", + "parts": [{"type": "text", "content": "hi"}], + }, + ) + + def test_system_message(self): + result = _parse( + serialize_input_message({"role": "system", "content": "be helpful"}) + ) + self.assertDictEqual( + result, + { + "role": "system", + "parts": [{"type": "text", "content": "be helpful"}], + }, + ) + + def test_assistant_message_with_tool_calls(self): + msg = { + "role": "assistant", + "content": "", + "tool_calls": [{"id": "tc1", "function": {"name": "f", "arguments": "{}"}}], + } + result = _parse(serialize_input_message(msg)) + self.assertEqual(result["role"], "assistant") + # text part from content + tool_call part + self.assertListEqual( + [p["type"] for p in result["parts"]], + ["text", "tool_call"], + ) + + def test_tool_message(self): + msg = {"role": "tool", "content": "22C sunny", "tool_call_id": "tc1"} + result = _parse(serialize_input_message(msg)) + self.assertDictEqual( + result, + { + "role": "tool", + "parts": [ + {"type": "tool_call_response", "response": "22C sunny", "id": "tc1"} + ], + }, + ) + + def test_tool_message_without_tool_call_id(self): + msg = {"role": "tool", "content": "result"} + result = _parse(serialize_input_message(msg)) + self.assertNotIn("id", result["parts"][0]) + + # -- Conversation entry: function.result ---------------------------------- + + def test_function_result_entry(self): + msg = { + "type": "function.result", + "result": '{"status": "ok"}', + "tool_call_id": "tc1", + } + result = _parse(serialize_input_message(msg)) + self.assertDictEqual( + result, + { + "role": "tool", + "parts": [ + { + "type": "tool_call_response", + "response": '{"status": "ok"}', + "id": "tc1", + } + ], + }, + ) + + def test_function_result_entry_without_tool_call_id(self): + msg = {"type": "function.result", "result": "data"} + result = _parse(serialize_input_message(msg)) + self.assertNotIn("id", result["parts"][0]) + + # -- Edge cases ----------------------------------------------------------- + + def test_missing_role_defaults_to_unknown(self): + result = _parse(serialize_input_message({"content": "orphan"})) + self.assertDictEqual( + result, + { + "role": "unknown", + "parts": [{"type": "text", "content": "orphan"}], + }, + ) + + def test_no_content_no_tool_calls(self): + result = _parse(serialize_input_message({"role": "user"})) + self.assertDictEqual(result, {"role": "user", "parts": []}) + + +class TestSerializeOutputMessage(unittest.TestCase): + def test_simple_assistant_response(self): + choice = { + "message": {"role": "assistant", "content": "hello"}, + "finish_reason": "stop", + } + result = _parse(serialize_output_message(choice)) + self.assertDictEqual( + result, + { + "role": "assistant", + "parts": [{"type": "text", "content": "hello"}], + "finish_reason": "stop", + }, + ) + + def test_tool_calls_response(self): + choice = { + "message": { + "role": "assistant", + "content": None, + "tool_calls": [ + {"id": "tc1", "function": {"name": "f", "arguments": "{}"}}, + ], + }, + "finish_reason": "tool_calls", + } + result = _parse(serialize_output_message(choice)) + self.assertEqual(result["finish_reason"], "tool_calls") + self.assertListEqual( + [p["type"] for p in result["parts"]], + ["tool_call"], + ) + + def test_missing_message(self): + result = _parse(serialize_output_message({})) + self.assertDictEqual( + result, + { + "role": "assistant", + "parts": [], + "finish_reason": "", + }, + ) + + def test_message_is_none(self): + result = _parse(serialize_output_message({"message": None})) + self.assertDictEqual( + result, + { + "role": "assistant", + "parts": [], + "finish_reason": "", + }, + ) + + def test_defaults_role_to_assistant(self): + choice = {"message": {"content": "hi"}, "finish_reason": "stop"} + result = _parse(serialize_output_message(choice)) + self.assertDictEqual( + result, + { + "role": "assistant", + "parts": [{"type": "text", "content": "hi"}], + "finish_reason": "stop", + }, + ) + + +class TestSerializeToolDefinition(unittest.TestCase): + def test_full_definition(self): + tool = { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get weather", + "parameters": {"type": "object", "properties": {}}, + }, + } + serialized = serialize_tool_definition(tool) + self.assertIsNotNone(serialized) + assert serialized is not None + self.assertDictEqual( + _parse(serialized), + { + "type": "function", + "name": "get_weather", + "description": "Get weather", + "parameters": {"type": "object", "properties": {}}, + }, + ) + + def test_minimal_definition(self): + """Only name, no description or parameters.""" + tool = {"function": {"name": "f"}} + serialized = serialize_tool_definition(tool) + self.assertIsNotNone(serialized) + assert serialized is not None + self.assertDictEqual( + _parse(serialized), + { + "type": "function", + "name": "f", + }, + ) + + def test_missing_function_returns_none(self): + self.assertIsNone(serialize_tool_definition({"type": "function"})) + + def test_empty_function_returns_none(self): + self.assertIsNone(serialize_tool_definition({"function": {}})) + + def test_missing_name_returns_none(self): + self.assertIsNone( + serialize_tool_definition({"function": {"description": "no name"}}) + ) + + def test_custom_type_preserved(self): + tool = {"type": "custom_tool", "function": {"name": "f"}} + serialized = serialize_tool_definition(tool) + self.assertIsNotNone(serialized) + assert serialized is not None + self.assertDictEqual( + _parse(serialized), + { + "type": "custom_tool", + "name": "f", + }, + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/mistralai/extra/tests/test_streaming.py b/src/mistralai/extra/tests/test_streaming.py new file mode 100644 index 00000000..9f2ec557 --- /dev/null +++ b/src/mistralai/extra/tests/test_streaming.py @@ -0,0 +1,372 @@ +"""Tests for streaming SSE parsing and chunk accumulation. + +Unit tests for the pure functions in ``observability.streaming``, independent +of OTEL span management. +""" + +import json +import unittest + +from mistralai.client.models import ( + CompletionChunk, + CompletionResponseStreamChoice, + CompletionResponseStreamChoiceFinishReason, + DeltaMessage, + FunctionCall, + ToolCall, + UsageInfo, +) +from mistralai.extra.observability.streaming import ( + accumulate_chunks_to_response_dict, + parse_sse_chunks, +) + +_DEFAULT_ID = "id-1" +_DEFAULT_MODEL = "m" + + +def _single_choice_chunk( + id: str = _DEFAULT_ID, + model: str = _DEFAULT_MODEL, + role: str | None = None, + content: str | None = None, + tool_calls: list[ToolCall] | None = None, + finish_reason: CompletionResponseStreamChoiceFinishReason | None = None, + object: str | None = None, + created: int | None = None, + usage: UsageInfo | None = None, +) -> CompletionChunk: + return CompletionChunk( + id=id, + model=model, + choices=[ + CompletionResponseStreamChoice( + index=0, + delta=DeltaMessage(role=role, content=content, tool_calls=tool_calls), + finish_reason=finish_reason, + ) + ], + object=object, + created=created, + usage=usage, + ) + + +def _dump(model) -> dict: + return model.model_dump(mode="json", by_alias=True) + + +def _to_sse(chunks: list[CompletionChunk], done: bool = True) -> bytes: + """Build SSE bytes from CompletionChunk models.""" + lines = [f"data: {json.dumps(_dump(c))}" for c in chunks] + if done: + lines.append("data: [DONE]") + return ("\n\n".join(lines) + "\n\n").encode() + + +class TestParseSseChunks(unittest.TestCase): + def test_parses_valid_chunks(self): + chunks = [ + _single_choice_chunk(content="hello"), + _single_choice_chunk(content=" world", finish_reason="stop"), + ] + result = parse_sse_chunks(_to_sse(chunks)) + self.assertEqual(result, chunks) + + def test_skips_done_sentinel(self): + chunk = _single_choice_chunk(content="hi", finish_reason="stop") + result = parse_sse_chunks(_to_sse([chunk], done=True)) + self.assertEqual(result, [chunk]) + + def test_skips_invalid_json(self): + sse = b"data: {not valid json}\n\ndata: [DONE]\n\n" + result = parse_sse_chunks(sse) + self.assertEqual(result, []) + + def test_skips_non_data_lines(self): + chunk = _single_choice_chunk(content="hi", finish_reason="stop") + sse = b"event: message\n\n" + _to_sse([chunk]) + result = parse_sse_chunks(sse) + self.assertEqual(result, [chunk]) + + def test_empty_bytes(self): + self.assertEqual(parse_sse_chunks(b""), []) + + +class TestAccumulateChunks(unittest.TestCase): + def test_simple_content_concatenation(self): + chunks = [ + _single_choice_chunk(role="assistant", content="Hello"), + _single_choice_chunk(content=" world"), + _single_choice_chunk( + content="", + finish_reason="stop", + usage=UsageInfo(prompt_tokens=10, completion_tokens=5, total_tokens=15), + ), + ] + result = accumulate_chunks_to_response_dict(chunks) + + self.assertDictEqual( + result, + { + "id": _DEFAULT_ID, + "model": _DEFAULT_MODEL, + "choices": [ + { + "message": {"role": "assistant", "content": "Hello world"}, + "finish_reason": "stop", + } + ], + "usage": { + "prompt_tokens": 10, + "completion_tokens": 5, + "total_tokens": 15, + }, + }, + ) + + def test_tool_call_argument_accumulation(self): + """Tool call arguments fragmented across multiple chunks.""" + chunks = [ + _single_choice_chunk( + role="assistant", + content="", + tool_calls=[ + ToolCall( + id="tc-1", + index=0, + function=FunctionCall(name="get_weather", arguments=""), + ) + ], + ), + _single_choice_chunk( + tool_calls=[ + ToolCall( + index=0, function=FunctionCall(name="", arguments='{"location"') + ) + ], + ), + _single_choice_chunk( + tool_calls=[ + ToolCall( + index=0, function=FunctionCall(name="", arguments=': "Paris"}') + ) + ], + finish_reason="tool_calls", + ), + ] + result = accumulate_chunks_to_response_dict(chunks) + + self.assertDictEqual( + result, + { + "id": _DEFAULT_ID, + "model": _DEFAULT_MODEL, + "choices": [ + { + "message": { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "tc-1", + "function": { + "name": "get_weather", + "arguments": '{"location": "Paris"}', + }, + } + ], + }, + "finish_reason": "tool_calls", + } + ], + }, + ) + + def test_multiple_tool_calls_on_same_choice(self): + """Two tool calls on the same choice, different indexes.""" + chunks = [ + _single_choice_chunk( + role="assistant", + tool_calls=[ + ToolCall( + id="tc-1", + index=0, + function=FunctionCall( + name="get_weather", arguments='{"location": "Paris"}' + ), + ), + ToolCall( + id="tc-2", + index=1, + function=FunctionCall( + name="get_time", arguments='{"timezone": "CET"}' + ), + ), + ], + finish_reason="tool_calls", + ), + ] + result = accumulate_chunks_to_response_dict(chunks) + + self.assertDictEqual( + result, + { + "id": _DEFAULT_ID, + "model": _DEFAULT_MODEL, + "choices": [ + { + "message": { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "tc-1", + "function": { + "name": "get_weather", + "arguments": '{"location": "Paris"}', + }, + }, + { + "id": "tc-2", + "function": { + "name": "get_time", + "arguments": '{"timezone": "CET"}', + }, + }, + ], + }, + "finish_reason": "tool_calls", + } + ], + }, + ) + + def test_multiple_choices(self): + """n > 1: parallel choice accumulation.""" + chunks = [ + CompletionChunk( + id=_DEFAULT_ID, + model=_DEFAULT_MODEL, + choices=[ + CompletionResponseStreamChoice( + index=0, + delta=DeltaMessage(role="assistant", content="Answer A"), + finish_reason=None, + ), + CompletionResponseStreamChoice( + index=1, + delta=DeltaMessage(role="assistant", content="Answer B"), + finish_reason=None, + ), + ], + ), + CompletionChunk( + id=_DEFAULT_ID, + model=_DEFAULT_MODEL, + choices=[ + CompletionResponseStreamChoice( + index=0, delta=DeltaMessage(content=""), finish_reason="stop" + ), + CompletionResponseStreamChoice( + index=1, delta=DeltaMessage(content=""), finish_reason="stop" + ), + ], + ), + ] + result = accumulate_chunks_to_response_dict(chunks) + + self.assertDictEqual( + result, + { + "id": _DEFAULT_ID, + "model": _DEFAULT_MODEL, + "choices": [ + { + "message": {"role": "assistant", "content": "Answer A"}, + "finish_reason": "stop", + }, + { + "message": {"role": "assistant", "content": "Answer B"}, + "finish_reason": "stop", + }, + ], + }, + ) + + def test_missing_usage(self): + """Interrupted stream — no usage in any chunk.""" + chunks = [ + _single_choice_chunk(role="assistant", content="partial"), + ] + result = accumulate_chunks_to_response_dict(chunks) + + self.assertDictEqual( + result, + { + "id": _DEFAULT_ID, + "model": _DEFAULT_MODEL, + "choices": [ + { + "message": {"role": "assistant", "content": "partial"}, + "finish_reason": "", + } + ], + }, + ) + + def test_function_name_accumulation(self): + """Function name split across chunks.""" + chunks = [ + _single_choice_chunk( + tool_calls=[ + ToolCall( + id="tc-1", + index=0, + function=FunctionCall(name="get_", arguments=""), + ) + ], + ), + _single_choice_chunk( + tool_calls=[ + ToolCall( + index=0, + function=FunctionCall( + name="weather", arguments='{"loc": "Paris"}' + ), + ) + ], + finish_reason="tool_calls", + ), + ] + result = accumulate_chunks_to_response_dict(chunks) + + self.assertDictEqual( + result, + { + "id": _DEFAULT_ID, + "model": _DEFAULT_MODEL, + "choices": [ + { + "message": { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "tc-1", + "function": { + "name": "get_weather", + "arguments": '{"loc": "Paris"}', + }, + } + ], + }, + "finish_reason": "tool_calls", + } + ], + }, + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/uv.lock b/uv.lock index 461e1d5c..85bc31e1 100644 --- a/uv.lock +++ b/uv.lock @@ -585,6 +585,7 @@ dev = [ { name = "griffe" }, { name = "mcp" }, { name = "mypy" }, + { name = "opentelemetry-sdk" }, { name = "pylint" }, { name = "pytest" }, { name = "pytest-asyncio" }, @@ -609,7 +610,7 @@ requires-dist = [ { name = "invoke", specifier = ">=2.2.0,<3.0.0" }, { name = "mcp", marker = "extra == 'agents'", specifier = ">=1.0,<2.0" }, { name = "opentelemetry-api", specifier = ">=1.33.1,<2.0.0" }, - { name = "opentelemetry-semantic-conventions", specifier = ">=0.59b0,<0.61" }, + { name = "opentelemetry-semantic-conventions", specifier = ">=0.60b1,<0.61" }, { name = "pydantic", specifier = ">=2.11.2" }, { name = "python-dateutil", specifier = ">=2.8.2" }, { name = "pyyaml", specifier = ">=6.0.2,<7.0.0" }, @@ -625,6 +626,7 @@ dev = [ { name = "griffe", specifier = ">=1.7.3,<2" }, { name = "mcp", specifier = ">=1.0,<2" }, { name = "mypy", specifier = "==1.15.0" }, + { name = "opentelemetry-sdk", specifier = ">=1.33.1,<2.0.0" }, { name = "pylint", specifier = "==3.2.3" }, { name = "pytest", specifier = ">=8.2.2,<9" }, { name = "pytest-asyncio", specifier = ">=0.23.7,<0.24" }, @@ -697,28 +699,42 @@ wheels = [ [[package]] name = "opentelemetry-api" -version = "1.38.0" +version = "1.39.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "importlib-metadata" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/08/d8/0f354c375628e048bd0570645b310797299754730079853095bf000fba69/opentelemetry_api-1.38.0.tar.gz", hash = "sha256:f4c193b5e8acb0912b06ac5b16321908dd0843d75049c091487322284a3eea12", size = 65242, upload-time = "2025-10-16T08:35:50.25Z" } +sdist = { url = "https://files.pythonhosted.org/packages/97/b9/3161be15bb8e3ad01be8be5a968a9237c3027c5be504362ff800fca3e442/opentelemetry_api-1.39.1.tar.gz", hash = "sha256:fbde8c80e1b937a2c61f20347e91c0c18a1940cecf012d62e65a7caf08967c9c", size = 65767, upload-time = "2025-12-11T13:32:39.182Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/ae/a2/d86e01c28300bd41bab8f18afd613676e2bd63515417b77636fc1add426f/opentelemetry_api-1.38.0-py3-none-any.whl", hash = "sha256:2891b0197f47124454ab9f0cf58f3be33faca394457ac3e09daba13ff50aa582", size = 65947, upload-time = "2025-10-16T08:35:30.23Z" }, + { url = "https://files.pythonhosted.org/packages/cf/df/d3f1ddf4bb4cb50ed9b1139cc7b1c54c34a1e7ce8fd1b9a37c0d1551a6bd/opentelemetry_api-1.39.1-py3-none-any.whl", hash = "sha256:2edd8463432a7f8443edce90972169b195e7d6a05500cd29e6d13898187c9950", size = 66356, upload-time = "2025-12-11T13:32:17.304Z" }, +] + +[[package]] +name = "opentelemetry-sdk" +version = "1.39.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/eb/fb/c76080c9ba07e1e8235d24cdcc4d125ef7aa3edf23eb4e497c2e50889adc/opentelemetry_sdk-1.39.1.tar.gz", hash = "sha256:cf4d4563caf7bff906c9f7967e2be22d0d6b349b908be0d90fb21c8e9c995cc6", size = 171460, upload-time = "2025-12-11T13:32:49.369Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7c/98/e91cf858f203d86f4eccdf763dcf01cf03f1dae80c3750f7e635bfa206b6/opentelemetry_sdk-1.39.1-py3-none-any.whl", hash = "sha256:4d5482c478513ecb0a5d938dcc61394e647066e0cc2676bee9f3af3f3f45f01c", size = 132565, upload-time = "2025-12-11T13:32:35.069Z" }, ] [[package]] name = "opentelemetry-semantic-conventions" -version = "0.59b0" +version = "0.60b1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "opentelemetry-api" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/40/bc/8b9ad3802cd8ac6583a4eb7de7e5d7db004e89cb7efe7008f9c8a537ee75/opentelemetry_semantic_conventions-0.59b0.tar.gz", hash = "sha256:7a6db3f30d70202d5bf9fa4b69bc866ca6a30437287de6c510fb594878aed6b0", size = 129861, upload-time = "2025-10-16T08:36:03.346Z" } +sdist = { url = "https://files.pythonhosted.org/packages/91/df/553f93ed38bf22f4b999d9be9c185adb558982214f33eae539d3b5cd0858/opentelemetry_semantic_conventions-0.60b1.tar.gz", hash = "sha256:87c228b5a0669b748c76d76df6c364c369c28f1c465e50f661e39737e84bc953", size = 137935, upload-time = "2025-12-11T13:32:50.487Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/24/7d/c88d7b15ba8fe5c6b8f93be50fc11795e9fc05386c44afaf6b76fe191f9b/opentelemetry_semantic_conventions-0.59b0-py3-none-any.whl", hash = "sha256:35d3b8833ef97d614136e253c1da9342b4c3c083bbaf29ce31d572a1c3825eed", size = 207954, upload-time = "2025-10-16T08:35:48.054Z" }, + { url = "https://files.pythonhosted.org/packages/7a/5e/5958555e09635d09b75de3c4f8b9cae7335ca545d77392ffe7331534c402/opentelemetry_semantic_conventions-0.60b1-py3-none-any.whl", hash = "sha256:9fa8c8b0c110da289809292b0591220d3a7b53c1526a23021e977d68597893fb", size = 219982, upload-time = "2025-12-11T13:32:36.955Z" }, ] [[package]]