diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-agentscope/src/opentelemetry/instrumentation/agentscope/_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-agentscope/src/opentelemetry/instrumentation/agentscope/_wrapper.py index fb1b9b75e..dd34323c2 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-agentscope/src/opentelemetry/instrumentation/agentscope/_wrapper.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-agentscope/src/opentelemetry/instrumentation/agentscope/_wrapper.py @@ -16,13 +16,31 @@ Concurrency ----------- -ReAct step state is attached to each agent instance for one ``__call__`` -lifecycle (see ``_react_step_state``). This matches AgentScope's own -``AgentBase`` design: a single ``_reply_task`` / ``_reply_id`` slot per -instance with no locking. Callers must not overlap ``await agent(...)`` on -the same instance across coroutines or threads without external -serialization, or telemetry and AgentScope's ``interrupt()`` semantics can -both be wrong. +The instrumentation supports concurrent ``await agent(...)`` calls on the +**same** ``AgentBase`` instance (e.g. TUI multi-tab, ``asyncio.gather``, +ASGI singletons). Per-call telemetry state is isolated through Python +``contextvars`` so that each asyncio task / thread observes its own +``_ReactStepState``; sibling concurrent invocations on the same agent never +clobber each other's react-step span, round counter or pending acting count. + +Two pieces of shared state are kept off the instance: + +1. ``_REACT_STATE`` (``ContextVar[_ReactStepState | None]``) holds the + per-call state. ReAct hooks read it via ``_REACT_STATE.get()`` and + no-op when the slot is ``None`` (i.e. the hook fired in a context that + did not opt into instrumentation). The state is never written to + ``agent`` attributes. + +2. ``_REACT_HOOK_REGISTRY`` is a process-global ``id(agent) -> ref count`` + map guarded by ``_REACT_HOOK_REGISTRY_LOCK``. The first concurrent call + on an agent registers four hooks under the fixed name + ``_REACT_HOOK_NAME``; subsequent concurrent calls only bump the ref + count. Hooks are removed only after the **last** outstanding call on + that agent unwinds. This is required because AgentScope's hook + registry is per-instance, and we must avoid both (a) registering the + same hook twice (which would double-fire ``_reasoning`` callbacks) and + (b) removing hooks while a sibling concurrent call still depends on + them. Stacked ``ChatModelBase`` / ``AgentBase`` implementations (e.g. proxies where each layer subclasses the base and ``__call__`` forwards to an inner model or @@ -35,11 +53,12 @@ import contextvars import logging +import threading import timeit -import uuid +import weakref from dataclasses import dataclass, field from functools import wraps -from typing import Any, AsyncGenerator +from typing import Any, AsyncGenerator, MutableMapping from opentelemetry.context import get_current as _get_current_context from opentelemetry.util.genai.extended_handler import ExtendedTelemetryHandler @@ -56,7 +75,12 @@ logger = logging.getLogger(__name__) -_REACT_STEP_HOOK_PREFIX = "otel_react_step" +# Fixed hook name shared by all concurrent invocations on a given agent. +# Using a stable name (rather than a per-call uuid) lets AgentScope's own +# registry naturally deduplicate, so a single set of callbacks fires for +# each ``_reasoning`` / ``_acting`` regardless of how many concurrent +# ``__call__`` invocations are in flight on the same instance. +_REACT_HOOK_NAME = "otel_react_step" # Per-async-task nesting for stacked __call__ (proxy / decorator chains). _CHAT_MODEL_CALL_DEPTH = contextvars.ContextVar( @@ -68,6 +92,32 @@ default=0, ) +# Per-call ReAct step state. Stored in a ContextVar (not on the agent +# instance) so concurrent ``await agent(...)`` calls on the same instance +# observe isolated state. Hooks read via ``_REACT_STATE.get()`` and no-op +# when unset (i.e. fired in a context that did not opt into instrumentation). +_REACT_STATE: contextvars.ContextVar["_ReactStepState | None"] = ( + contextvars.ContextVar( + "opentelemetry_agentscope_react_state", + default=None, + ) +) + +# Per-agent ``outstanding __call__`` counter for ReAct hook lifetime +# management. Hooks are registered exactly once on the first concurrent +# call and removed only when the last outstanding call unwinds; this +# avoids both double-firing (multiple registrations under the same name) +# and premature removal while sibling calls still depend on the hooks. +# +# A ``WeakKeyDictionary`` is used (rather than ``id(agent) -> int``) so +# that an agent that is garbage-collected without a paired ``release`` +# (e.g. wrapper crashed between ``acquire`` and the protected region) +# automatically drops its registry entry, preventing a different agent +# from later being mis-recognised as already instrumented just because +# CPython recycled its memory address. +_REACT_HOOK_REGISTRY_LOCK = threading.Lock() +_REACT_HOOK_REGISTRY: MutableMapping[Any, int] = weakref.WeakKeyDictionary() + def _is_react_agent(agent_instance: Any) -> bool: """Check if an agent instance is a ReAct agent by duck-typing.""" @@ -76,17 +126,14 @@ def _is_react_agent(agent_instance: Any) -> bool: @dataclass class _ReactStepState: - """Per-agent-call state for React step span lifecycle. + """Per-call state for React step span lifecycle. - This object is stored on the agent instance only while a single - ``AgentBase.__call__`` is in progress. It is not safe for concurrent - overlapping ``__call__`` on the same instance (same assumption as - AgentScope's single ``_reply_task`` field). + Lives in the ``_REACT_STATE`` ``ContextVar`` rather than on the agent + instance, so concurrent ``__call__`` invocations on the same agent + each see their own state and never overwrite each other's + ``active_step`` / round counter / pending acting count. """ - hook_name: str = field( - default_factory=lambda: f"{_REACT_STEP_HOOK_PREFIX}_{uuid.uuid4().hex[:8]}" - ) react_round: int = 0 active_step: ReactStepInvocation | None = None original_context: Any = field(default=None) @@ -110,9 +157,7 @@ def _make_pre_reasoning_hook( """ def hook(agent_self: Any, kwargs: dict) -> None: - state: _ReactStepState | None = getattr( - agent_self, "_react_step_state", None - ) + state = _REACT_STATE.get() if state is None: return None @@ -146,9 +191,7 @@ def _make_post_reasoning_hook( """ def hook(agent_self: Any, kwargs: dict, output: Any) -> None: - state: _ReactStepState | None = getattr( - agent_self, "_react_step_state", None - ) + state = _REACT_STATE.get() if state is None: return None try: @@ -173,9 +216,7 @@ def _make_pre_acting_hook() -> Any: """Track nested _acting wrappers (subclass calls ``super()._acting``).""" def hook(agent_self: Any, kwargs: dict) -> None: - state: _ReactStepState | None = getattr( - agent_self, "_react_step_state", None - ) + state = _REACT_STATE.get() if state is None: return None state.acting_nesting += 1 @@ -194,9 +235,7 @@ def _make_post_acting_hook( """ def hook(agent_self: Any, kwargs: dict, output: Any) -> None: - state: _ReactStepState | None = getattr( - agent_self, "_react_step_state", None - ) + state = _REACT_STATE.get() if state is None: return None try: @@ -214,44 +253,112 @@ def hook(agent_self: Any, kwargs: dict, output: Any) -> None: return hook -def _register_react_hooks( - agent: Any, state: _ReactStepState, handler: ExtendedTelemetryHandler +_REACT_HOOK_TYPES = ( + "pre_reasoning", + "post_reasoning", + "pre_acting", + "post_acting", +) + + +def _acquire_react_hooks( + agent: Any, handler: ExtendedTelemetryHandler ) -> None: - """Register React step tracking hooks on an agent instance.""" - agent.register_instance_hook( - "pre_reasoning", - state.hook_name, - _make_pre_reasoning_hook(handler), - ) - agent.register_instance_hook( - "post_reasoning", - state.hook_name, - _make_post_reasoning_hook(handler), - ) - agent.register_instance_hook( - "pre_acting", - state.hook_name, - _make_pre_acting_hook(), - ) - agent.register_instance_hook( - "post_acting", - state.hook_name, - _make_post_acting_hook(handler), - ) + """Register ReAct step hooks on ``agent`` if no concurrent call has + already done so, and bump the per-instance reference count. + + Hooks are registered exactly once under the fixed name + ``_REACT_HOOK_NAME``. Subsequent concurrent invocations only bump the + refcount; the actual ``register_instance_hook`` calls happen only + when the count transitions from 0 to 1. This avoids registering the + same hook twice (which would cause AgentScope to double-fire each + ``_reasoning`` / ``_acting`` callback) while keeping the hooks alive + for as long as any in-flight call still needs them. + + If a partial registration fails (e.g. AgentScope rejects the third of + four ``register_instance_hook`` calls), any hooks already installed + in this transition are rolled back and the refcount is left at zero + so the agent ends up in a consistent, un-instrumented state and the + exception is propagated to the caller. + """ + with _REACT_HOOK_REGISTRY_LOCK: + count = _REACT_HOOK_REGISTRY.get(agent, 0) + if count == 0: + registrations = ( + ("pre_reasoning", _make_pre_reasoning_hook(handler)), + ("post_reasoning", _make_post_reasoning_hook(handler)), + ("pre_acting", _make_pre_acting_hook()), + ("post_acting", _make_post_acting_hook(handler)), + ) + installed: list = [] + try: + for hook_type, hook_fn in registrations: + agent.register_instance_hook( + hook_type, _REACT_HOOK_NAME, hook_fn + ) + installed.append(hook_type) + except Exception: + # Roll back any partially installed hooks so the agent is + # left exactly as we found it; the refcount stays at 0 + # because we never bumped it past this point. + for hook_type in installed: + try: + agent.remove_instance_hook( + hook_type, _REACT_HOOK_NAME + ) + except Exception: + logger.warning( + "AgentScope instrumentation: failed to roll " + "back %s hook on %s during acquire failure", + hook_type, + type(agent).__name__, + ) + raise + _REACT_HOOK_REGISTRY[agent] = count + 1 -def _remove_react_hooks(agent: Any, state: _ReactStepState) -> None: - """Remove React step tracking hooks from an agent instance.""" - for hook_type in ( - "pre_reasoning", - "post_reasoning", - "pre_acting", - "post_acting", - ): - try: - agent.remove_instance_hook(hook_type, state.hook_name) - except (ValueError, KeyError): - pass +def _release_react_hooks(agent: Any) -> None: + """Decrement the ref count and remove hooks when the last in-flight + call on ``agent`` unwinds. + + Hooks must not be removed while sibling concurrent invocations are + still running, otherwise their ``_reasoning`` / ``_acting`` callbacks + would silently stop firing mid-flight. + + Releases without a paired acquire (``count`` already 0) are silently + tolerated rather than raising: the only way to reach this state is + for a caller's ``try/finally`` to fire even though the matching + ``_acquire_react_hooks`` aborted before bumping the refcount, in + which case there is simply nothing to clean up. + """ + with _REACT_HOOK_REGISTRY_LOCK: + current = _REACT_HOOK_REGISTRY.get(agent, 0) + if current <= 0: + return + count = current - 1 + if count <= 0: + for hook_type in _REACT_HOOK_TYPES: + try: + agent.remove_instance_hook( + hook_type, _REACT_HOOK_NAME + ) + except (ValueError, KeyError): + # AgentScope already lost the hook (e.g. user code + # called remove_instance_hook directly or rebuilt the + # registry). Log and continue: leaving the refcount + # entry around is worse than a noisy log line. + logger.warning( + "AgentScope instrumentation: %s hook missing " + "on %s during release; continuing cleanup", + hook_type, + type(agent).__name__, + ) + try: + del _REACT_HOOK_REGISTRY[agent] + except KeyError: + pass + else: + _REACT_HOOK_REGISTRY[agent] = count class AgentScopeChatModelWrapper: @@ -473,15 +580,18 @@ async def async_wrapped_call( is_react = _is_react_agent(call_self) state: _ReactStepState | None = None + state_token = None if is_react: - # Single slot on the instance: safe only when this __call__ - # does not overlap another on the same agent (AgentScope - # uses the same pattern for _reply_task). + # Per-call state lives in a ContextVar so concurrent + # ``__call__`` invocations on the same agent each see + # an independent ``_ReactStepState``. Hooks fire under + # a single shared registration on the instance and + # dispatch to the active state via ``_REACT_STATE``. state = _ReactStepState( original_context=_get_current_context(), ) - call_self._react_step_state = state - _register_react_hooks(call_self, state, self._handler) + state_token = _REACT_STATE.set(state) + _acquire_react_hooks(call_self, self._handler) try: result = await original_call( @@ -519,10 +629,10 @@ async def async_wrapped_call( raise finally: - if is_react and state: - _remove_react_hooks(call_self, state) - if hasattr(call_self, "_react_step_state"): - del call_self._react_step_state + if is_react: + _release_react_hooks(call_self) + if state_token is not None: + _REACT_STATE.reset(state_token) except Exception as e: logger.exception("Error in agent instrumentation: %s", e) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-agentscope/tests/test_concurrent_react_step.py b/instrumentation-loongsuite/loongsuite-instrumentation-agentscope/tests/test_concurrent_react_step.py new file mode 100644 index 000000000..8465b7d94 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-agentscope/tests/test_concurrent_react_step.py @@ -0,0 +1,345 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. + +# -*- coding: utf-8 -*- +"""Concurrency regression tests for AgentScope ReAct step instrumentation. + +Background +---------- +Before the ContextVar fix, per-call ``_ReactStepState`` was attached to the +agent instance (``call_self._react_step_state = state``) and ReAct hooks +were registered with a per-call uuid suffix. Two concurrent +``await agent(...)`` invocations on the same instance therefore: + +1. Overwrote each other's ``active_step`` reference, leaking a step span + that was never closed by ``stop_react_step``. +2. Each registered four hooks under unique names, so AgentScope's framework + fired *both* hook sets on every ``_reasoning`` / ``_acting`` callback, + doubling the start_react_step / stop_react_step counts and corrupting + the round counter. +3. ``del call_self._react_step_state`` in the first invocation's ``finally`` + block removed the *other* invocation's state mid-flight. + +These tests exercise the same hook plumbing that +``AgentScopeAgentWrapper.async_wrapped_call`` uses, but drive it directly so +the test does not depend on a real AgentScope ``ReActAgent``. They lock in +the ContextVar isolation + reference-counted hook registration contract. +""" + +from __future__ import annotations + +import asyncio +import threading + +import pytest + +from opentelemetry.instrumentation.agentscope._wrapper import ( + _REACT_HOOK_NAME, + _REACT_HOOK_REGISTRY, + _REACT_STATE, + _ReactStepState, + _acquire_react_hooks, + _release_react_hooks, +) + + +# -------------------------------------------------------------------------- +# Test doubles +# -------------------------------------------------------------------------- +class _ToolUseBlock: + pass + + +class _Reasoning: + """Minimal ``ChatResponse``-like stub for ``post_reasoning`` hooks.""" + + def __init__(self, n_tools: int) -> None: + self._n = n_tools + + def get_content_blocks(self, kind: str): + return [_ToolUseBlock()] * self._n if kind == "tool_use" else [] + + +class _FakeAgent: + """Mimics the slice of ``AgentBase`` used by the hook registration path. + + Real ``AgentBase`` exposes ``register_instance_hook`` / + ``remove_instance_hook`` and an ``_instance_pre_reasoning_hooks`` attribute + used for ReAct duck-typing. The hook registry stores one callable per + ``(hook_type, name)`` pair; firing a hook type invokes every registered + callable in registration order. + """ + + def __init__(self) -> None: + self._instance_pre_reasoning_hooks: dict = {} + self._hooks: "dict[str, dict[str, callable]]" = { + "pre_reasoning": {}, + "post_reasoning": {}, + "pre_acting": {}, + "post_acting": {}, + } + + def register_instance_hook(self, hook_type, name, fn) -> None: + self._hooks[hook_type][name] = fn + + def remove_instance_hook(self, hook_type, name) -> None: + self._hooks[hook_type].pop(name) + + def fire(self, hook_type: str, *args) -> None: + for fn in list(self._hooks[hook_type].values()): + fn(self, *args) + + +class _RecordingHandler: + """Thread-safe handler stub recording start/stop_react_step round numbers.""" + + def __init__(self) -> None: + self.start_calls: list = [] + self.stop_calls: list = [] + self._lock = threading.Lock() + + def start_react_step(self, inv, context=None) -> None: + with self._lock: + self.start_calls.append(inv.round) + + def stop_react_step(self, inv) -> None: + with self._lock: + self.stop_calls.append(inv.round) + + def fail_react_step(self, inv, err) -> None: + with self._lock: + self.stop_calls.append(("fail", inv.round)) + + +# -------------------------------------------------------------------------- +# Helpers +# -------------------------------------------------------------------------- +async def _drive_one_invocation( + agent: _FakeAgent, + handler: _RecordingHandler, + barrier: asyncio.Event, + n_arrived: list, + n_arrived_lock: threading.Lock, +) -> tuple: + """Drive a single fake ``__call__`` lifecycle through the ContextVar. + + Mirrors the production wrapper: + + - ``_REACT_STATE.set(state)`` opens an isolated state slot. + - ``_acquire_react_hooks`` registers the shared hook callables (or + bumps the refcount when another concurrent call already did so). + - Two ReAct rounds (one tool round, one stop round) run between the + acquire / release boundaries. + - ``_release_react_hooks`` + ``_REACT_STATE.reset(token)`` mirror the + wrapper's ``finally`` cleanup. + """ + state = _ReactStepState(original_context=None) + token = _REACT_STATE.set(state) + _acquire_react_hooks(agent, handler) + try: + # ----- ReAct round 1: one tool_call -> one acting iteration ----- + agent.fire("pre_reasoning", {}) + agent.fire("post_reasoning", {}, _Reasoning(n_tools=1)) + agent.fire("pre_acting", {}) + agent.fire("post_acting", {}, None) + + # Wait until both concurrent invocations finish round 1, so + # round 2 also runs while the registry refcount is at 2 (the + # critical window where the buggy implementation leaked spans). + with n_arrived_lock: + n_arrived[0] += 1 + while n_arrived[0] < 2: + await asyncio.sleep(0) + barrier.set() + await barrier.wait() + + # ----- ReAct round 2: finish_reason == "stop" ----- + agent.fire("pre_reasoning", {}) + agent.fire("post_reasoning", {}, _Reasoning(n_tools=0)) + + # The wrapper closes a leftover step (no acting -> finish_reason "stop"). + if state.active_step is not None: + handler.stop_react_step(state.active_step) + state.active_step = None + + return ( + state.react_round, + list(handler.start_calls), + list(handler.stop_calls), + ) + finally: + _release_react_hooks(agent) + _REACT_STATE.reset(token) + + +# -------------------------------------------------------------------------- +# Tests +# -------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_state_isolated_across_concurrent_invocations(): + """Regression: concurrent invocations must produce 4 starts / 4 stops. + + With the old instance-attribute state, the second invocation's + ``call_self._react_step_state = state`` assignment would clobber the + first invocation's ``active_step``, leaving its round-1 step span + open forever (3 starts, 3 stops, broken parent/child topology). + """ + agent = _FakeAgent() + handler = _RecordingHandler() + barrier = asyncio.Event() + n_arrived = [0] + n_arrived_lock = threading.Lock() + + results = await asyncio.gather( + _drive_one_invocation(agent, handler, barrier, n_arrived, n_arrived_lock), + _drive_one_invocation(agent, handler, barrier, n_arrived, n_arrived_lock), + ) + + # Each invocation observed its own round counter advancing 1 -> 2, + # untouched by the sibling. + for round_, _, _ in results: + assert round_ == 2, ( + "react_round counter leaked across concurrent invocations: " + f"got {round_}, expected 2" + ) + + # Two invocations * two rounds = exactly 4 starts and 4 stops, no leak. + assert len(handler.start_calls) == 4, ( + "expected 4 start_react_step (2 invocations x 2 rounds), got " + f"{len(handler.start_calls)}: {handler.start_calls}" + ) + assert len(handler.stop_calls) == 4, ( + "expected 4 stop_react_step (2 invocations x 2 rounds), got " + f"{len(handler.stop_calls)}: {handler.stop_calls} " + "- a leaked step span indicates broken concurrency isolation" + ) + + # round=1 must fire exactly twice (once per invocation). With the old + # uuid-suffixed double-registration bug, AgentScope would invoke both + # hook sets and round=1 would appear 4 times. + assert handler.start_calls.count(1) == 2, ( + f"round=1 fired {handler.start_calls.count(1)} times, expected 2 " + "- duplicate hook registration is double-firing _reasoning" + ) + assert handler.start_calls.count(2) == 2 + + +@pytest.mark.asyncio +async def test_hook_registry_releases_after_last_invocation(): + """After the last concurrent invocation unwinds, hooks are removed + and the registry entry for that agent is gone.""" + agent = _FakeAgent() + handler = _RecordingHandler() + barrier = asyncio.Event() + n_arrived = [0] + n_arrived_lock = threading.Lock() + + await asyncio.gather( + _drive_one_invocation(agent, handler, barrier, n_arrived, n_arrived_lock), + _drive_one_invocation(agent, handler, barrier, n_arrived, n_arrived_lock), + ) + + assert agent not in _REACT_HOOK_REGISTRY, ( + "hook refcount entry leaked after last invocation" + ) + for hook_type in ( + "pre_reasoning", + "post_reasoning", + "pre_acting", + "post_acting", + ): + assert _REACT_HOOK_NAME not in agent._hooks[hook_type], ( + f"{hook_type} hook still registered after last invocation" + ) + + +@pytest.mark.asyncio +async def test_hook_registry_keeps_hooks_during_overlapping_calls(): + """Hooks must stay registered while a sibling concurrent call still + needs them; they are removed only after the **last** call unwinds.""" + agent = _FakeAgent() + handler = _RecordingHandler() + + # First call enters the critical region. + state_a = _ReactStepState(original_context=None) + token_a = _REACT_STATE.set(state_a) + _acquire_react_hooks(agent, handler) + assert _REACT_HOOK_REGISTRY.get(agent) == 1 + + # Second concurrent call enters; refcount must bump, hooks unchanged. + state_b = _ReactStepState(original_context=None) + token_b = _REACT_STATE.set(state_b) + _acquire_react_hooks(agent, handler) + assert _REACT_HOOK_REGISTRY.get(agent) == 2 + for hook_type in agent._hooks: + assert _REACT_HOOK_NAME in agent._hooks[hook_type] + + # First call unwinds -> refcount drops to 1, hooks must remain so + # that the second call's pending ``_reasoning`` / ``_acting`` keep firing. + _release_react_hooks(agent) + _REACT_STATE.reset(token_b) + assert _REACT_HOOK_REGISTRY.get(agent) == 1 + for hook_type in agent._hooks: + assert _REACT_HOOK_NAME in agent._hooks[hook_type], ( + f"{hook_type} hook removed prematurely while another call " + "was still in flight" + ) + + # Second call unwinds -> hooks finally removed. + _release_react_hooks(agent) + _REACT_STATE.reset(token_a) + assert agent not in _REACT_HOOK_REGISTRY + for hook_type in agent._hooks: + assert _REACT_HOOK_NAME not in agent._hooks[hook_type] + + +@pytest.mark.asyncio +async def test_acquire_rolls_back_on_partial_registration_failure(): + """When AgentScope rejects one of the four hook registrations, the + already-installed hooks must be rolled back and the refcount must + stay at zero so the agent ends up un-instrumented and the exception + propagates to the caller (i.e. ``acquire`` is all-or-nothing).""" + + class _FailingAgent(_FakeAgent): + def register_instance_hook(self, hook_type, name, fn): + if hook_type == "pre_acting": + raise RuntimeError("simulated registration failure") + super().register_instance_hook(hook_type, name, fn) + + agent = _FailingAgent() + handler = _RecordingHandler() + + with pytest.raises(RuntimeError, match="simulated registration failure"): + _acquire_react_hooks(agent, handler) + + # No partial state left behind. + assert agent not in _REACT_HOOK_REGISTRY + for hook_type in agent._hooks: + assert _REACT_HOOK_NAME not in agent._hooks[hook_type], ( + f"{hook_type} hook leaked through after acquire rollback" + ) + + # A subsequent successful acquire on a fresh agent must work fine - + # i.e. the failed one did not poison module-level state. + fresh_agent = _FakeAgent() + _acquire_react_hooks(fresh_agent, handler) + try: + assert _REACT_HOOK_REGISTRY.get(fresh_agent) == 1 + finally: + _release_react_hooks(fresh_agent) + + +@pytest.mark.asyncio +async def test_release_without_prior_acquire_is_safe_noop(): + """If a caller's ``finally`` invokes release after an acquire failed + (and therefore never bumped the refcount), the release must not + raise or underflow the registry entry.""" + agent = _FakeAgent() + + # No prior acquire on this agent. + _release_react_hooks(agent) + _release_react_hooks(agent) # idempotent + + assert agent not in _REACT_HOOK_REGISTRY diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-agentscope/tests/test_react_step_reentrancy.py b/instrumentation-loongsuite/loongsuite-instrumentation-agentscope/tests/test_react_step_reentrancy.py index 5f526cc3b..5ec0e1ac8 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-agentscope/tests/test_react_step_reentrancy.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-agentscope/tests/test_react_step_reentrancy.py @@ -9,7 +9,10 @@ from types import SimpleNamespace from unittest.mock import MagicMock +import pytest + from opentelemetry.instrumentation.agentscope._wrapper import ( + _REACT_STATE, _make_post_acting_hook, _make_post_reasoning_hook, _make_pre_acting_hook, @@ -28,10 +31,28 @@ def get_content_blocks(self, kind: str): return [{"type": "tool_use"}] * self._n -def test_nested_reasoning_only_outermost_opens_step_and_counts_tools(): - handler = MagicMock() +@pytest.fixture +def react_state(): + """Per-test ReactStepState bound to the ``_REACT_STATE`` ContextVar. + + Hooks now read state via ``_REACT_STATE.get()`` instead of + ``getattr(agent, "_react_step_state")``; tests must publish state on + the ContextVar and reset it on teardown to keep tests isolated. + """ state = _ReactStepState(original_context=None) - agent = SimpleNamespace(_react_step_state=state) + token = _REACT_STATE.set(state) + try: + yield state + finally: + _REACT_STATE.reset(token) + + +def test_nested_reasoning_only_outermost_opens_step_and_counts_tools( + react_state, +): + handler = MagicMock() + state = react_state + agent = SimpleNamespace() pre = _make_pre_reasoning_hook(handler) post = _make_post_reasoning_hook(handler) @@ -54,12 +75,12 @@ def test_nested_reasoning_only_outermost_opens_step_and_counts_tools(): assert state.pending_acting_count == 2 -def test_nested_acting_only_outermost_decrements_pending(): +def test_nested_acting_only_outermost_decrements_pending(react_state): handler = MagicMock() - state = _ReactStepState(original_context=None) + state = react_state state.pending_acting_count = 2 state.active_step = MagicMock() - agent = SimpleNamespace(_react_step_state=state) + agent = SimpleNamespace() pre_a = _make_pre_acting_hook() post_a = _make_post_acting_hook(handler) @@ -83,3 +104,28 @@ def test_nested_acting_only_outermost_decrements_pending(): post_a(agent, {}, None) assert state.pending_acting_count == 0 assert handler.stop_react_step.call_count == 1 + + +def test_hook_no_ops_when_react_state_not_set(): + """When ``_REACT_STATE`` is unset (default None), hooks must not crash + or call the handler. This is the safety valve that lets the same + hook callable serve concurrent contexts: only contexts that opt in + via ``_REACT_STATE.set(...)`` see telemetry. + """ + handler = MagicMock() + pre = _make_pre_reasoning_hook(handler) + post = _make_post_reasoning_hook(handler) + pre_a = _make_pre_acting_hook() + post_a = _make_post_acting_hook(handler) + + agent = SimpleNamespace() + # _REACT_STATE intentionally not set in this context. + assert _REACT_STATE.get() is None + + pre(agent, {}) + post(agent, {}, _MsgWithToolBlocks(1)) + pre_a(agent, {}) + post_a(agent, {}, None) + + assert handler.start_react_step.call_count == 0 + assert handler.stop_react_step.call_count == 0