diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-vita/README.md b/instrumentation-loongsuite/loongsuite-instrumentation-vita/README.md new file mode 100644 index 000000000..a722e267c --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-vita/README.md @@ -0,0 +1,17 @@ +# LoongSuite VitaBench Instrumentation + +OpenTelemetry instrumentation for the VitaBench multi-domain simulation framework. + +## Installation + +```bash +pip install loongsuite-instrumentation-vita +``` + +## Usage + +```python +from opentelemetry.instrumentation.vita import VitaInstrumentor + +VitaInstrumentor().instrument() +``` diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-vita/examples/__init__.py b/instrumentation-loongsuite/loongsuite-instrumentation-vita/examples/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-vita/pyproject.toml b/instrumentation-loongsuite/loongsuite-instrumentation-vita/pyproject.toml new file mode 100644 index 000000000..d1df8fa2e --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-vita/pyproject.toml @@ -0,0 +1,55 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "loongsuite-instrumentation-vita" +dynamic = ["version"] +description = "LoongSuite VitaBench instrumentation" +readme = "README.md" +license = "Apache-2.0" +requires-python = ">=3.10,<4" +authors = [ + { name = "Zhiyong Liu", email = "liuzhiyong.lzy@alibaba-inc.com" }, + { name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] +dependencies = [ + "opentelemetry-api >= 1.37.0", + "opentelemetry-instrumentation >= 0.58b0", + "opentelemetry-semantic-conventions >= 0.58b0", + "wrapt >= 1.0.0, < 2.0.0", + "opentelemetry-util-genai >= 0.3b0.dev0", +] + +[project.optional-dependencies] +instruments = [ + "vita >= 0.0.1", +] + +[project.entry-points.opentelemetry_instrumentor] +vita = "opentelemetry.instrumentation.vita:VitaInstrumentor" + +[project.urls] +Homepage = "https://github.com/alibaba/loongsuite-python-agent/tree/main/instrumentation-loongsuite/loongsuite-instrumentation-vita" +Repository = "https://github.com/alibaba/loongsuite-python-agent" + +[tool.hatch.version] +path = "src/opentelemetry/instrumentation/vita/version.py" + +[tool.hatch.build.targets.sdist] +include = [ + "/src", + "/tests", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/opentelemetry"] diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/__init__.py b/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/__init__.py new file mode 100644 index 000000000..1e58668a6 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/__init__.py @@ -0,0 +1,223 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +OpenTelemetry VitaBench Instrumentation + +Usage +----- +.. code:: python + + from opentelemetry.instrumentation.vita import VitaInstrumentor + + VitaInstrumentor().instrument() + + # ... run vitabench tasks ... + + VitaInstrumentor().uninstrument() + +API +--- +""" + +from __future__ import annotations + +import logging +from typing import Any, Collection + +from wrapt import wrap_function_wrapper + +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.instrumentation.vita.package import _instruments +from opentelemetry.instrumentation.vita.patch import ( + wrap_generate, + wrap_generate_next_message, + wrap_get_response, + wrap_orchestrator_run, + wrap_orchestrator_step, + wrap_run_task, +) +from opentelemetry.instrumentation.vita.version import __version__ +from opentelemetry.util.genai.extended_handler import ExtendedTelemetryHandler + +logger = logging.getLogger(__name__) + +__all__ = ["VitaInstrumentor", "__version__"] + + +class VitaInstrumentor(BaseInstrumentor): + """OpenTelemetry instrumentor for VitaBench framework. + + Instruments the following components: + - vita.run.run_task(): Entry spans (ENTRY) + - Orchestrator.run(): Workflow spans (CHAIN) + - Orchestrator.step(): ReAct step spans (STEP) + - LLMAgent.generate_next_message(): Agent spans (AGENT) + - generate(): LLM call spans (LLM) + - Environment.get_response(): Tool execution spans (TOOL) + """ + + def __init__(self): + super().__init__() + self._handler = None + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs: Any) -> None: + """Enable VitaBench instrumentation.""" + tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") + logger_provider = kwargs.get("logger_provider") + + self._handler = ExtendedTelemetryHandler( + tracer_provider=tracer_provider, + meter_provider=meter_provider, + logger_provider=logger_provider, + ) + + # Hook #5: generate -> LLM. Wrap this first so modules that import + # generate directly (for example vita.agent.llm_agent) bind to the + # instrumented function during their import. + try: + wrap_function_wrapper( + module="vita.utils.llm_utils", + name="generate", + wrapper=lambda w, i, a, k: wrap_generate( + w, i, a, k, handler=self._handler + ), + ) + logger.debug("Instrumented vita.utils.llm_utils.generate") + except Exception as e: + logger.warning(f"Could not wrap vita.utils.llm_utils.generate: {e}") + + # Hook #1: run_task -> ENTRY + try: + wrap_function_wrapper( + module="vita.run", + name="run_task", + wrapper=lambda w, i, a, k: wrap_run_task( + w, i, a, k, handler=self._handler + ), + ) + logger.debug("Instrumented vita.run.run_task") + except Exception as e: + logger.warning(f"Could not wrap vita.run.run_task: {e}") + + # Hook #2: Orchestrator.run -> CHAIN + try: + wrap_function_wrapper( + module="vita.orchestrator.orchestrator", + name="Orchestrator.run", + wrapper=lambda w, i, a, k: wrap_orchestrator_run( + w, i, a, k, handler=self._handler + ), + ) + logger.debug("Instrumented Orchestrator.run") + except Exception as e: + logger.warning(f"Could not wrap Orchestrator.run: {e}") + + # Hook #3: Orchestrator.step -> STEP + try: + wrap_function_wrapper( + module="vita.orchestrator.orchestrator", + name="Orchestrator.step", + wrapper=lambda w, i, a, k: wrap_orchestrator_step( + w, i, a, k, handler=self._handler + ), + ) + logger.debug("Instrumented Orchestrator.step") + except Exception as e: + logger.warning(f"Could not wrap Orchestrator.step: {e}") + + # Hook #4a: LLMAgent.generate_next_message -> AGENT + try: + wrap_function_wrapper( + module="vita.agent.llm_agent", + name="LLMAgent.generate_next_message", + wrapper=lambda w, i, a, k: wrap_generate_next_message( + w, i, a, k, handler=self._handler + ), + ) + logger.debug("Instrumented LLMAgent.generate_next_message") + except Exception as e: + logger.warning(f"Could not wrap LLMAgent.generate_next_message: {e}") + + # Hook #4b: LLMSoloAgent.generate_next_message -> AGENT + try: + wrap_function_wrapper( + module="vita.agent.llm_agent", + name="LLMSoloAgent.generate_next_message", + wrapper=lambda w, i, a, k: wrap_generate_next_message( + w, i, a, k, handler=self._handler + ), + ) + logger.debug("Instrumented LLMSoloAgent.generate_next_message") + except Exception as e: + logger.warning(f"Could not wrap LLMSoloAgent.generate_next_message: {e}") + + # Hook #6: Environment.get_response -> TOOL + try: + wrap_function_wrapper( + module="vita.environment.environment", + name="Environment.get_response", + wrapper=lambda w, i, a, k: wrap_get_response( + w, i, a, k, handler=self._handler + ), + ) + logger.debug("Instrumented Environment.get_response") + except Exception as e: + logger.warning(f"Could not wrap Environment.get_response: {e}") + + def _uninstrument(self, **kwargs: Any) -> None: + """Disable VitaBench instrumentation.""" + try: + import vita.run # noqa: PLC0415 + + unwrap(vita.run, "run_task") + except Exception as e: + logger.debug(f"Failed to uninstrument vita.run.run_task: {e}") + + try: + import vita.orchestrator.orchestrator # noqa: PLC0415 + + unwrap(vita.orchestrator.orchestrator.Orchestrator, "run") + unwrap(vita.orchestrator.orchestrator.Orchestrator, "step") + except Exception as e: + logger.debug(f"Failed to uninstrument Orchestrator: {e}") + + try: + import vita.agent.llm_agent # noqa: PLC0415 + + unwrap(vita.agent.llm_agent.LLMAgent, "generate_next_message") + unwrap(vita.agent.llm_agent.LLMSoloAgent, "generate_next_message") + except Exception as e: + logger.debug(f"Failed to uninstrument LLMAgent: {e}") + + try: + import vita.utils.llm_utils # noqa: PLC0415 + + unwrap(vita.utils.llm_utils, "generate") + except Exception as e: + logger.debug(f"Failed to uninstrument generate: {e}") + + try: + import vita.environment.environment # noqa: PLC0415 + + unwrap(vita.environment.environment.Environment, "get_response") + except Exception as e: + logger.debug(f"Failed to uninstrument Environment: {e}") + + self._handler = None diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/package.py b/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/package.py new file mode 100644 index 000000000..a776722c9 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/package.py @@ -0,0 +1,3 @@ +_instruments = ("vita >= 0.0.1",) + +_supports_metrics = False diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/patch.py b/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/patch.py new file mode 100644 index 000000000..182da38d6 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/patch.py @@ -0,0 +1,432 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Patch functions for VitaBench instrumentation. + +Wraps key vitabench methods to generate OpenTelemetry spans: +- run_task() -> ENTRY spans +- Orchestrator.run() -> CHAIN spans +- Orchestrator.step() -> STEP spans (react) +- LLMAgent.generate_next_message() -> AGENT spans +- generate() -> LLM spans +- Environment.get_response() -> TOOL spans +""" + +from __future__ import annotations + +import json +import logging +import uuid +from contextvars import ContextVar +from typing import Any, Optional + +from opentelemetry import trace as trace_api +from opentelemetry.trace import SpanKind, Status, StatusCode +from opentelemetry.util.genai.extended_handler import ExtendedTelemetryHandler +from opentelemetry.util.genai.extended_semconv import gen_ai_extended_attributes +from opentelemetry.util.genai.extended_types import ( + EntryInvocation, + ExecuteToolInvocation, + InvokeAgentInvocation, + ReactStepInvocation, +) +from opentelemetry.util.genai.types import ( + Error, + InputMessage, + LLMInvocation, + OutputMessage, + Text, +) + +from .utils import ( + _convert_vita_assistant_to_output, + _convert_vita_messages_to_input, + _get_tool_definitions, + _infer_provider, + _MAX_CONTENT_LEN, +) + +logger = logging.getLogger(__name__) + +# ContextVars for ReAct step tracking +_react_step_invocation: ContextVar[Optional[ReactStepInvocation]] = ContextVar( + "vita_react_step_invocation", default=None +) +_react_step_counter: ContextVar[int] = ContextVar( + "vita_react_step_counter", default=0 +) + +# Reentrancy guard for AGENT span (LLMSoloAgent extends LLMAgent) +_in_agent_invoke: ContextVar[bool] = ContextVar( + "vita_in_agent_invoke", default=False +) + + +def _close_active_react_step(handler: ExtendedTelemetryHandler) -> None: + """Close the currently active react_step span, if any.""" + prev = _react_step_invocation.get() + if prev is not None: + try: + handler.stop_react_step(prev) + except Exception as e: + logger.debug(f"Failed to close react step: {e}") + _react_step_invocation.set(None) + + +# ==================== Hook #1: run_task -> ENTRY ==================== + + +def wrap_run_task( + wrapped, instance, args, kwargs, handler: ExtendedTelemetryHandler +): + """Wrapper for vita.run.run_task to create ENTRY span.""" + task = args[1] if len(args) > 1 else kwargs.get("task") + domain = args[0] if args else kwargs.get("domain") + + invocation = EntryInvocation( + session_id=str(uuid.uuid4()), + user_id=None, + ) + invocation.attributes["gen_ai.framework"] = "vitabench" + + if task and hasattr(task, "instructions") and task.instructions: + invocation.input_messages = [ + InputMessage(role="user", parts=[Text(content=str(task.instructions)[:_MAX_CONTENT_LEN])]) + ] + + handler.start_entry(invocation) + try: + result = wrapped(*args, **kwargs) + + if result: + output_parts = [] + if hasattr(result, "termination_reason") and result.termination_reason: + output_parts.append(Text(content=f"termination: {result.termination_reason}")) + if hasattr(result, "reward_info") and result.reward_info: + reward = getattr(result.reward_info, "reward", None) + if reward is not None: + output_parts.append(Text(content=f"reward: {reward}")) + if output_parts: + invocation.output_messages = [ + OutputMessage( + role="assistant", + parts=output_parts, + finish_reason="stop", + ) + ] + + handler.stop_entry(invocation) + return result + except Exception as e: + handler.fail_entry(invocation, Error(message=str(e), type=type(e))) + raise + + +# ==================== Hook #2: Orchestrator.run -> CHAIN ==================== + + +def wrap_orchestrator_run( + wrapped, instance, args, kwargs, handler: ExtendedTelemetryHandler +): + """Wrapper for Orchestrator.run to create CHAIN span.""" + task = getattr(instance, "task", None) + domain = getattr(instance, "domain", "unknown") + span_name = f"workflow {domain}" + + input_text = "" + if task and hasattr(task, "instructions") and task.instructions: + input_text = str(task.instructions)[:_MAX_CONTENT_LEN] + + tracer = handler._tracer + + # Reset step counter for this orchestrator run + counter_token = _react_step_counter.set(0) + step_token = _react_step_invocation.set(None) + + with tracer.start_as_current_span( + name=span_name, + kind=SpanKind.INTERNAL, + attributes={ + "gen_ai.operation.name": "workflow", + "gen_ai.system": "vitabench", + gen_ai_extended_attributes.GEN_AI_SPAN_KIND: "CHAIN", + "gen_ai.framework": "vitabench", + }, + ) as span: + if input_text: + span.set_attribute("input.value", input_text) + + try: + result = wrapped(*args, **kwargs) + + # Close any remaining open step span + _close_active_react_step(handler) + + if result and hasattr(result, "termination_reason") and result.termination_reason: + span.set_attribute("output.value", str(result.termination_reason)) + + span.set_status(Status(StatusCode.OK)) + return result + except Exception as e: + # Close any remaining open step span + _close_active_react_step(handler) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR)) + raise + finally: + _react_step_counter.reset(counter_token) + _react_step_invocation.reset(step_token) + + +# ==================== Hook #3: Orchestrator.step -> STEP ==================== + + +def wrap_orchestrator_step( + wrapped, instance, args, kwargs, handler: ExtendedTelemetryHandler +): + """Wrapper for Orchestrator.step to create STEP span on AGENT turns.""" + to_role = getattr(instance, "to_role", None) + + # Import Role enum dynamically to avoid import-time dependency + _Role = None + try: + from vita.orchestrator.orchestrator import Role + _Role = Role + except ImportError: + pass + + is_agent_turn = False + if _Role is not None: + is_agent_turn = (to_role == _Role.AGENT) + else: + is_agent_turn = (str(to_role) == "Role.AGENT" or str(to_role) == "agent") + + if is_agent_turn: + # Close previous STEP span (deferred close strategy) + _close_active_react_step(handler) + + step_num = _react_step_counter.get() + 1 + _react_step_counter.set(step_num) + + step_inv = ReactStepInvocation(round=step_num) + handler.start_react_step(step_inv) + _react_step_invocation.set(step_inv) + + try: + result = wrapped(*args, **kwargs) + + if is_agent_turn: + current_step = _react_step_invocation.get() + if current_step: + done = getattr(instance, "done", False) + if done: + term_reason = getattr(instance, "termination_reason", None) + if term_reason: + current_step.finish_reason = ( + term_reason.value + if hasattr(term_reason, "value") + else str(term_reason) + ) + else: + current_step.finish_reason = "agent_stop" + else: + message = getattr(instance, "message", None) + if message and hasattr(message, "is_tool_call") and message.is_tool_call(): + current_step.finish_reason = "tool_call" + else: + current_step.finish_reason = "assistant_text" + + return result + except Exception as e: + current_step = _react_step_invocation.get() + if current_step: + current_step.finish_reason = "error" + handler.fail_react_step(current_step, Error(message=str(e), type=type(e))) + _react_step_invocation.set(None) + raise + + +# ==================== Hook #4: generate_next_message -> AGENT ==================== + + +def wrap_generate_next_message( + wrapped, instance, args, kwargs, handler: ExtendedTelemetryHandler +): + """Wrapper for LLMAgent.generate_next_message / LLMSoloAgent.generate_next_message.""" + # Reentrancy guard + if _in_agent_invoke.get(): + return wrapped(*args, **kwargs) + token = _in_agent_invoke.set(True) + + try: + agent_name = instance.__class__.__name__ + model = getattr(instance, "llm", None) + + invocation = InvokeAgentInvocation( + provider="vitabench", + agent_name=agent_name, + request_model=model, + ) + + # input_messages + message = args[0] if args else kwargs.get("message") + state = args[1] if len(args) > 1 else kwargs.get("state") + if message: + invocation.input_messages = _convert_vita_messages_to_input([message]) + + # system_instruction + if state and hasattr(state, "system_messages") and state.system_messages: + invocation.system_instruction = [ + Text(content=str(sm.content)[:_MAX_CONTENT_LEN]) + for sm in state.system_messages + if sm and getattr(sm, "content", None) + ] + + # tool_definitions + tools = getattr(instance, "tools", None) + tool_defs = _get_tool_definitions(tools) + if tool_defs: + invocation.tool_definitions = tool_defs + + handler.start_invoke_agent(invocation) + + try: + result = wrapped(*args, **kwargs) + assistant_msg, _ = result + + # output_messages + invocation.output_messages = _convert_vita_assistant_to_output(assistant_msg) + + # token usage + usage = getattr(assistant_msg, "usage", None) + if usage and isinstance(usage, dict): + invocation.input_tokens = usage.get("prompt_tokens") + invocation.output_tokens = usage.get("completion_tokens") + + handler.stop_invoke_agent(invocation) + return result + except Exception as e: + handler.fail_invoke_agent(invocation, Error(message=str(e), type=type(e))) + raise + finally: + _in_agent_invoke.reset(token) + + +# ==================== Hook #5: generate -> LLM ==================== + + +def wrap_generate( + wrapped, instance, args, kwargs, handler: ExtendedTelemetryHandler +): + """Wrapper for vita.utils.llm_utils.generate to create LLM span.""" + model = args[0] if args else kwargs.get("model", "unknown") + messages = args[1] if len(args) > 1 else kwargs.get("messages", []) + tools = args[2] if len(args) > 2 else kwargs.get("tools") + temperature = kwargs.get("temperature") + + invocation = LLMInvocation( + request_model=model or "unknown", + provider=_infer_provider(model or ""), + temperature=temperature, + ) + invocation.max_tokens = kwargs.get("max_tokens") + + # input_messages + invocation.input_messages = _convert_vita_messages_to_input(messages) + + # tool_definitions + tool_defs = _get_tool_definitions(tools) + if tool_defs: + invocation.tool_definitions = tool_defs + + handler.start_llm(invocation) + + try: + result = wrapped(*args, **kwargs) + + if result: + # output_messages + invocation.output_messages = _convert_vita_assistant_to_output(result) + + # response_model_name + invocation.response_model_name = model + + # finish_reasons + if getattr(result, "tool_calls", None): + invocation.finish_reasons = ["tool_calls"] + else: + invocation.finish_reasons = ["stop"] + + # token usage + usage = getattr(result, "usage", None) + if usage and isinstance(usage, dict): + invocation.input_tokens = usage.get("prompt_tokens") + invocation.output_tokens = usage.get("completion_tokens") + + handler.stop_llm(invocation) + return result + except Exception as e: + handler.fail_llm(invocation, Error(message=str(e), type=type(e))) + raise + + +# ==================== Hook #6: Environment.get_response -> TOOL ==================== + + +def wrap_get_response( + wrapped, instance, args, kwargs, handler: ExtendedTelemetryHandler +): + """Wrapper for Environment.get_response to create TOOL span.""" + message = args[0] if args else kwargs.get("message") + + tool_name = getattr(message, "name", "unknown") if message else "unknown" + tool_call_id = getattr(message, "id", None) if message else None + + invocation = ExecuteToolInvocation( + tool_name=tool_name, + tool_call_id=tool_call_id, + provider="vitabench", + ) + + # tool_call_arguments + if message and hasattr(message, "arguments") and message.arguments: + try: + invocation.tool_call_arguments = json.dumps( + message.arguments, ensure_ascii=False, default=str + )[:_MAX_CONTENT_LEN] + except Exception: + invocation.tool_call_arguments = str(message.arguments)[:_MAX_CONTENT_LEN] + + handler.start_execute_tool(invocation) + + try: + result = wrapped(*args, **kwargs) + + # tool_call_result + if result and getattr(result, "content", None): + invocation.tool_call_result = str(result.content)[:_MAX_CONTENT_LEN] + + # Check if tool reported an error + if result and getattr(result, "error", False): + handler.fail_execute_tool( + invocation, + Error(message=f"Tool error: {getattr(result, 'content', '')}", type=RuntimeError), + ) + else: + handler.stop_execute_tool(invocation) + + return result + except Exception as e: + handler.fail_execute_tool(invocation, Error(message=str(e), type=type(e))) + raise diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/utils.py b/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/utils.py new file mode 100644 index 000000000..0793a6cc0 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/utils.py @@ -0,0 +1,169 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utility functions for VitaBench instrumentation. + +Handles conversion between vitabench Message types and +OpenTelemetry GenAI semantic convention types. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, List, Optional + +from opentelemetry.util.genai.types import ( + FunctionToolDefinition, + InputMessage, + OutputMessage, + Text, + ToolCall as OTelToolCall, + ToolCallResponse, +) + +logger = logging.getLogger(__name__) + +_MAX_CONTENT_LEN = 4096 + + +def _convert_vita_messages_to_input(messages: Any) -> List[InputMessage]: + """Convert vita Message list to OTel InputMessage list.""" + if not messages: + return [] + + if not isinstance(messages, list): + messages = [messages] + + result = [] + for msg in messages: + try: + role = getattr(msg, "role", None) + if role is None: + continue + + parts = [] + content = getattr(msg, "content", None) + tool_calls = getattr(msg, "tool_calls", None) + + if role == "tool": + msg_id = getattr(msg, "id", None) or "" + if content: + parts.append( + ToolCallResponse( + id=msg_id, + response=str(content)[:_MAX_CONTENT_LEN], + ) + ) + else: + if content: + parts.append(Text(content=str(content)[:_MAX_CONTENT_LEN])) + if tool_calls: + for tc in tool_calls: + tc_args = getattr(tc, "arguments", {}) + if isinstance(tc_args, dict): + tc_args = json.dumps(tc_args, ensure_ascii=False, default=str) + parts.append( + OTelToolCall( + name=getattr(tc, "name", ""), + id=getattr(tc, "id", None), + arguments=tc_args, + ) + ) + + if parts: + result.append(InputMessage(role=role, parts=parts)) + except Exception as e: + logger.debug(f"Error converting vita message: {e}") + continue + + return result + + +def _convert_vita_assistant_to_output(msg: Any) -> List[OutputMessage]: + """Convert vita AssistantMessage to OTel OutputMessage list.""" + if not msg: + return [] + + parts = [] + content = getattr(msg, "content", None) + tool_calls = getattr(msg, "tool_calls", None) + + if content: + parts.append(Text(content=str(content)[:_MAX_CONTENT_LEN])) + if tool_calls: + for tc in tool_calls: + tc_args = getattr(tc, "arguments", {}) + if isinstance(tc_args, dict): + tc_args = json.dumps(tc_args, ensure_ascii=False, default=str) + parts.append( + OTelToolCall( + name=getattr(tc, "name", ""), + id=getattr(tc, "id", None), + arguments=tc_args, + ) + ) + + finish_reason = "tool_calls" if tool_calls else "stop" + + if not parts: + parts.append(Text(content="")) + + return [OutputMessage(role="assistant", parts=parts, finish_reason=finish_reason)] + + +def _infer_provider(model_name: str) -> str: + """Infer provider from model name string.""" + if not model_name: + return "unknown" + m = model_name.lower() + if "gpt" in m or "o1" in m or "o3" in m: + return "openai" + if "claude" in m: + return "anthropic" + if "qwen" in m: + return "alibaba_cloud" + if "deepseek" in m: + return "deepseek" + if "gemini" in m: + return "google" + return "unknown" + + +def _get_tool_definitions(tools: Any) -> Optional[List[FunctionToolDefinition]]: + """Extract tool definitions from vita Tool list.""" + if not tools: + return None + + try: + defs = [] + for t in tools: + name = getattr(t, "name", None) + if not name: + continue + parameters = None + openai_schema = getattr(t, "openai_schema", None) + if isinstance(openai_schema, dict): + function_schema = openai_schema.get("function", openai_schema) + parameters = function_schema.get("parameters") + defs.append( + FunctionToolDefinition( + name=name, + description=getattr(t, "short_desc", None), + parameters=parameters, + ) + ) + return defs if defs else None + except Exception: + return None diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/version.py b/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/version.py new file mode 100644 index 000000000..26056b5d8 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-vita/src/opentelemetry/instrumentation/vita/version.py @@ -0,0 +1 @@ +__version__ = "0.5.0.dev" diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-vita/tests/__init__.py b/instrumentation-loongsuite/loongsuite-instrumentation-vita/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-vita/tests/conftest.py b/instrumentation-loongsuite/loongsuite-instrumentation-vita/tests/conftest.py new file mode 100644 index 000000000..0d2ab7221 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-vita/tests/conftest.py @@ -0,0 +1,99 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test configuration for VitaBench instrumentation tests.""" + +import os + +import pytest + +from opentelemetry.instrumentation.vita import VitaInstrumentor +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import ( + InMemoryLogExporter, + SimpleLogRecordProcessor, +) +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) + + +def pytest_configure(config: pytest.Config): + os.environ["OTEL_SEMCONV_STABILITY_OPT_IN"] = "gen_ai_latest_experimental" + + +# ==================== Exporters ==================== + + +@pytest.fixture(scope="function", name="span_exporter") +def fixture_span_exporter(): + exporter = InMemorySpanExporter() + yield exporter + + +@pytest.fixture(scope="function", name="log_exporter") +def fixture_log_exporter(): + exporter = InMemoryLogExporter() + yield exporter + + +@pytest.fixture(scope="function", name="metric_reader") +def fixture_metric_reader(): + reader = InMemoryMetricReader() + yield reader + + +# ==================== Providers ==================== + + +@pytest.fixture(scope="function", name="tracer_provider") +def fixture_tracer_provider(span_exporter): + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + return provider + + +@pytest.fixture(scope="function", name="logger_provider") +def fixture_logger_provider(log_exporter): + provider = LoggerProvider() + provider.add_log_record_processor(SimpleLogRecordProcessor(log_exporter)) + return provider + + +@pytest.fixture(scope="function", name="meter_provider") +def fixture_meter_provider(metric_reader): + meter_provider = MeterProvider( + metric_readers=[metric_reader], + ) + return meter_provider + + +# ==================== Instrumentation ==================== + + +@pytest.fixture(scope="function") +def instrument(tracer_provider, logger_provider, meter_provider): + instrumentor = VitaInstrumentor() + instrumentor.instrument( + tracer_provider=tracer_provider, + logger_provider=logger_provider, + meter_provider=meter_provider, + skip_dep_check=True, + ) + yield instrumentor + instrumentor.uninstrument() diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-vita/tests/test_instrumentor.py b/instrumentation-loongsuite/loongsuite-instrumentation-vita/tests/test_instrumentor.py new file mode 100644 index 000000000..a6a2339f8 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-vita/tests/test_instrumentor.py @@ -0,0 +1,478 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for VitaBench instrumentation. + +The suite exercises all execute.md hook points. External I/O is replaced at the +HTTP/tool boundary, while the Vita agent/orchestrator call chain runs through +the real framework methods. +""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import pytest + +from opentelemetry.instrumentation.vita import VitaInstrumentor + + +FAKE_MODELS_CONFIG = { + "qwen-max": { + "base_url": "http://fake-api.example.com/v1/chat/completions", + "headers": {"Authorization": "Bearer test-key"}, + }, + "gpt-4": { + "base_url": "http://fake-api.example.com/v1/chat/completions", + "headers": {"Authorization": "Bearer test-key"}, + }, + "claude-3-opus": { + "base_url": "http://fake-api.example.com/v1/chat/completions", + "headers": {"Authorization": "Bearer test-key"}, + }, +} + + +def _make_openai_response(content=None, tool_calls=None, usage=None): + message = {"role": "assistant", "content": content} + if tool_calls: + message["tool_calls"] = tool_calls + return { + "id": "chatcmpl-test", + "model": "test-model", + "choices": [{"message": message, "finish_reason": "stop"}], + "usage": usage + or {"prompt_tokens": 100, "completion_tokens": 50, "total_tokens": 150}, + } + + +def _mock_requests_post(response_dict): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = response_dict + return mock_resp + + +def _tool_call_response(): + return _make_openai_response( + tool_calls=[ + { + "id": "call_1", + "type": "function", + "function": { + "name": "get_order", + "arguments": '{"order_id": "123"}', + }, + } + ], + usage={"prompt_tokens": 100, "completion_tokens": 20, "total_tokens": 120}, + ) + + +def _text_response(content="Order 123 has been delivered. ###STOP###"): + return _make_openai_response( + content=content, + usage={"prompt_tokens": 200, "completion_tokens": 30, "total_tokens": 230}, + ) + + +class FakeTool: + name = "get_order" + short_desc = "Get order details" + openai_schema = { + "type": "function", + "function": { + "name": "get_order", + "description": "Get order details", + "parameters": { + "type": "object", + "properties": {"order_id": {"type": "string"}}, + }, + }, + } + + +class FakeTools: + def __init__(self): + self.db = SimpleNamespace(time="2026-01-01 00:00:00") + self._tools = {"get_order": FakeTool()} + + def get_tools(self): + return self._tools + + def use_tool(self, tool_name, **kwargs): + return {"tool": tool_name, "arguments": kwargs, "status": "delivered"} + + def get_db_hash(self): + return "fake-db-hash" + + +class DeterministicUser: + def get_init_state(self, message_history=None): + return SimpleNamespace(messages=message_history or []) + + def generate_next_message(self, message, state): + from vita.data_model.message import UserMessage + + user_message = UserMessage(role="user", content="Check order 123") + state.messages.append(user_message) + return user_message, state + + +def _make_agent(): + from vita.agent.llm_agent import LLMAgent + + return LLMAgent( + tools=[FakeTool()], + domain_policy="You are helpful at {time}.", + llm="qwen-max", + llm_args={}, + time="2026-01-01 00:00:00", + language="english", + ) + + +def _make_orchestrator(): + from vita.environment.environment import Environment + from vita.orchestrator.orchestrator import Orchestrator + + return Orchestrator( + domain="delivery", + agent=_make_agent(), + user=DeterministicUser(), + environment=Environment(domain_name="delivery", tools=FakeTools()), + task=SimpleNamespace( + id="task_001", + instructions="Check order 123", + message_history=None, + ), + max_steps=6, + max_errors=3, + language="english", + ) + + +def _span_attrs(spans, name): + span = next(s for s in spans if s.name == name) + return dict(span.attributes) + + +class TestVitaInstrumentor: + def test_instrument_and_uninstrument( + self, tracer_provider, logger_provider, meter_provider + ): + instrumentor = VitaInstrumentor() + instrumentor.instrument( + tracer_provider=tracer_provider, + logger_provider=logger_provider, + meter_provider=meter_provider, + skip_dep_check=True, + ) + assert instrumentor._handler is not None + instrumentor.uninstrument() + assert instrumentor._handler is None + + def test_instrumentation_dependencies(self): + assert VitaInstrumentor().instrumentation_dependencies() == ( + "vita >= 0.0.1", + ) + + +class TestLLMSpan: + def test_llm_span_text_response(self, instrument, span_exporter): + from vita.data_model.message import UserMessage + from vita.utils.llm_utils import generate + + with patch("vita.utils.llm_utils.models", FAKE_MODELS_CONFIG), patch( + "requests.post", + return_value=_mock_requests_post( + _make_openai_response( + content="The order has been delivered.", + usage={ + "prompt_tokens": 150, + "completion_tokens": 30, + "total_tokens": 180, + }, + ) + ), + ): + result = generate( + model="qwen-max", + messages=[UserMessage(role="user", content="Where is my order?")], + ) + + assert result.content == "The order has been delivered." + spans = span_exporter.get_finished_spans() + attrs = _span_attrs(spans, "chat qwen-max") + assert attrs["gen_ai.operation.name"] == "chat" + assert attrs["gen_ai.span.kind"] == "LLM" + assert attrs["gen_ai.request.model"] == "qwen-max" + assert attrs["gen_ai.provider.name"] == "alibaba_cloud" + assert attrs["gen_ai.usage.input_tokens"] == 150 + assert attrs["gen_ai.usage.output_tokens"] == 30 + assert attrs["gen_ai.response.finish_reasons"] == ("stop",) + + def test_llm_span_tool_call_response(self, instrument, span_exporter): + from vita.data_model.message import UserMessage + from vita.utils.llm_utils import generate + + with patch("vita.utils.llm_utils.models", FAKE_MODELS_CONFIG), patch( + "requests.post", return_value=_mock_requests_post(_tool_call_response()) + ): + result = generate( + model="gpt-4", + messages=[UserMessage(role="user", content="Check my order")], + ) + + assert result.tool_calls is not None + attrs = _span_attrs(span_exporter.get_finished_spans(), "chat gpt-4") + assert attrs["gen_ai.response.finish_reasons"] == ("tool_calls",) + assert attrs["gen_ai.provider.name"] == "openai" + + def test_llm_span_captures_positional_tools(self, instrument, span_exporter): + from vita.data_model.message import UserMessage + from vita.utils.llm_utils import generate + + with patch("vita.utils.llm_utils.models", FAKE_MODELS_CONFIG), patch( + "requests.post", return_value=_mock_requests_post(_text_response("Done.")) + ): + generate( + "qwen-max", + [UserMessage(role="user", content="Check my order")], + [FakeTool()], + ) + + attrs = _span_attrs(span_exporter.get_finished_spans(), "chat qwen-max") + assert "gen_ai.tool.definitions" in attrs + assert "get_order" in attrs["gen_ai.tool.definitions"] + + +class TestToolSpan: + def test_tool_span_created(self, instrument, span_exporter): + from vita.data_model.message import ToolCall + from vita.environment.environment import Environment + + env = Environment(domain_name="delivery", tools=FakeTools()) + result = env.get_response( + ToolCall(id="tc_42", name="get_order", arguments={"order_id": "999"}) + ) + + assert result.content is not None + attrs = _span_attrs( + span_exporter.get_finished_spans(), "execute_tool get_order" + ) + assert attrs["gen_ai.operation.name"] == "execute_tool" + assert attrs["gen_ai.span.kind"] == "TOOL" + assert attrs["gen_ai.tool.name"] == "get_order" + assert attrs["gen_ai.tool.call.id"] == "tc_42" + + def test_tool_span_on_error(self, instrument, span_exporter): + from vita.data_model.message import ToolCall + from vita.environment.environment import Environment + + tools = FakeTools() + tools.use_tool = MagicMock(side_effect=RuntimeError("Tool failed")) + env = Environment(domain_name="delivery", tools=tools) + result = env.get_response( + ToolCall(id="tc_err", name="get_order", arguments={}) + ) + + assert result.error is True + tool_span = next( + s + for s in span_exporter.get_finished_spans() + if s.name == "execute_tool get_order" + ) + assert tool_span.status.status_code.name == "ERROR" + + +class TestAgentSpan: + def test_agent_span_created_for_llm_agent(self, instrument, span_exporter): + from vita.data_model.message import UserMessage + + agent = _make_agent() + state = agent.get_init_state([]) + + with patch("vita.utils.llm_utils.models", FAKE_MODELS_CONFIG), patch( + "requests.post", return_value=_mock_requests_post(_text_response("Sure.")) + ): + assistant_msg, _ = agent.generate_next_message( + UserMessage(role="user", content="I need help"), state + ) + + assert assistant_msg.content == "Sure." + spans = span_exporter.get_finished_spans() + agent_span = next(s for s in spans if s.name == "invoke_agent LLMAgent") + llm_span = next(s for s in spans if s.name == "chat qwen-max") + attrs = dict(agent_span.attributes) + assert attrs["gen_ai.operation.name"] == "invoke_agent" + assert attrs["gen_ai.span.kind"] == "AGENT" + assert attrs["gen_ai.agent.name"] == "LLMAgent" + assert attrs["gen_ai.request.model"] == "qwen-max" + assert llm_span.parent.span_id == agent_span.context.span_id + + def test_agent_span_created_for_llm_solo_agent(self, instrument, span_exporter): + from vita.agent.llm_agent import LLMSoloAgent + + agent = LLMSoloAgent( + tools=[FakeTool()], + domain_policy="unused", + llm="qwen-max", + llm_args={}, + time="2026-01-01 00:00:00", + language="english", + ) + state = agent.get_init_state([]) + + with patch("vita.utils.llm_utils.models", FAKE_MODELS_CONFIG), patch( + "requests.post", return_value=_mock_requests_post(_tool_call_response()) + ): + agent.generate_next_message(None, state) + + attrs = _span_attrs( + span_exporter.get_finished_spans(), "invoke_agent LLMSoloAgent" + ) + assert attrs["gen_ai.span.kind"] == "AGENT" + assert attrs["gen_ai.agent.name"] == "LLMSoloAgent" + + +class TestStepAndChainSpans: + def test_orchestrator_run_creates_chain_steps_agents_llms_and_tools( + self, instrument, span_exporter + ): + responses = [ + _mock_requests_post(_tool_call_response()), + _mock_requests_post(_text_response()), + ] + + with patch("vita.utils.llm_utils.models", FAKE_MODELS_CONFIG), patch( + "requests.post", side_effect=responses + ): + result = _make_orchestrator().run() + + assert result.termination_reason == "agent_stop" + spans = span_exporter.get_finished_spans() + chain = next(s for s in spans if s.name == "workflow delivery") + steps = sorted( + [s for s in spans if s.name == "react step"], key=lambda s: s.start_time + ) + agents = sorted( + [s for s in spans if s.name == "invoke_agent LLMAgent"], + key=lambda s: s.start_time, + ) + llms = sorted( + [s for s in spans if s.name == "chat qwen-max"], + key=lambda s: s.start_time, + ) + tools = [s for s in spans if s.name == "execute_tool get_order"] + + assert len(steps) == 2 + assert len(agents) == 2 + assert len(llms) == 2 + assert len(tools) == 1 + + chain_attrs = dict(chain.attributes) + assert chain_attrs["gen_ai.operation.name"] == "workflow" + assert chain_attrs["gen_ai.span.kind"] == "CHAIN" + assert chain_attrs["gen_ai.framework"] == "vitabench" + + assert dict(steps[0].attributes)["gen_ai.react.round"] == 1 + assert dict(steps[1].attributes)["gen_ai.react.round"] == 2 + for step in steps: + assert step.parent.span_id == chain.context.span_id + assert agents[0].parent.span_id == steps[0].context.span_id + assert agents[1].parent.span_id == steps[1].context.span_id + assert llms[0].parent.span_id == agents[0].context.span_id + assert llms[1].parent.span_id == agents[1].context.span_id + assert tools[0].parent.span_id == steps[0].context.span_id + + def test_open_step_fails_when_env_turn_raises(self, instrument, span_exporter): + with patch("vita.utils.llm_utils.models", FAKE_MODELS_CONFIG), patch( + "requests.post", return_value=_mock_requests_post(_tool_call_response()) + ), patch( + "vita.environment.environment.Environment.get_response", + side_effect=RuntimeError("env broke"), + ): + with pytest.raises(RuntimeError, match="env broke"): + _make_orchestrator().run() + + spans = span_exporter.get_finished_spans() + step = next(s for s in spans if s.name == "react step") + chain = next(s for s in spans if s.name == "workflow delivery") + step_attrs = dict(step.attributes) + assert step.status.status_code.name == "ERROR" + assert step_attrs["gen_ai.react.finish_reason"] == "error" + assert chain.status.status_code.name == "ERROR" + + +class TestEntrySpan: + def test_run_task_entry_wraps_orchestrator_trace(self, instrument, span_exporter): + from vita.run import run_task + + def fake_internal(**kwargs): + return _make_orchestrator().run() + + responses = [ + _mock_requests_post(_tool_call_response()), + _mock_requests_post(_text_response()), + ] + task = SimpleNamespace( + id="task_001", + instructions="Check order 123", + message_history=None, + ) + + with patch("vita.run._run_task_internal", side_effect=fake_internal), patch( + "vita.utils.llm_utils.models", FAKE_MODELS_CONFIG + ), patch("requests.post", side_effect=responses): + result = run_task("delivery", task, "llm_agent", "user_simulator") + + assert result.termination_reason == "agent_stop" + spans = span_exporter.get_finished_spans() + entry = next(s for s in spans if s.name == "enter_ai_application_system") + chain = next(s for s in spans if s.name == "workflow delivery") + attrs = dict(entry.attributes) + assert attrs["gen_ai.operation.name"] == "enter" + assert attrs["gen_ai.span.kind"] == "ENTRY" + assert attrs["gen_ai.framework"] == "vitabench" + assert "gen_ai.session.id" in attrs + assert chain.parent.span_id == entry.context.span_id + + +class TestProviderInference: + def test_common_provider_names(self, instrument, span_exporter): + from vita.data_model.message import UserMessage + from vita.utils.llm_utils import generate + + for model in ("gpt-4", "claude-3-opus", "qwen-max"): + with patch("vita.utils.llm_utils.models", FAKE_MODELS_CONFIG), patch( + "requests.post", + return_value=_mock_requests_post(_make_openai_response(content="Hi")), + ): + generate( + model=model, + messages=[UserMessage(role="user", content="Hi")], + ) + + providers = { + dict(s.attributes)["gen_ai.request.model"]: dict(s.attributes)[ + "gen_ai.provider.name" + ] + for s in span_exporter.get_finished_spans() + if s.name.startswith("chat ") + } + assert providers["gpt-4"] == "openai" + assert providers["claude-3-opus"] == "anthropic" + assert providers["qwen-max"] == "alibaba_cloud"