Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,31 @@

Concurrency
-----------
ReAct step state is attached to each agent instance for one ``__call__``
lifecycle (see ``_react_step_state``). This matches AgentScope's own
``AgentBase`` design: a single ``_reply_task`` / ``_reply_id`` slot per
instance with no locking. Callers must not overlap ``await agent(...)`` on
the same instance across coroutines or threads without external
serialization, or telemetry and AgentScope's ``interrupt()`` semantics can
both be wrong.
The instrumentation supports concurrent ``await agent(...)`` calls on the
**same** ``AgentBase`` instance (e.g. TUI multi-tab, ``asyncio.gather``,
ASGI singletons). Per-call telemetry state is isolated through Python
``contextvars`` so that each asyncio task / thread observes its own
``_ReactStepState``; sibling concurrent invocations on the same agent never
clobber each other's react-step span, round counter or pending acting count.

Two pieces of shared state are kept off the instance:

1. ``_REACT_STATE`` (``ContextVar[_ReactStepState | None]``) holds the
per-call state. ReAct hooks read it via ``_REACT_STATE.get()`` and
no-op when the slot is ``None`` (i.e. the hook fired in a context that
did not opt into instrumentation). The state is never written to
``agent`` attributes.

2. ``_REACT_HOOK_REGISTRY`` is a process-global ``id(agent) -> ref count``
map guarded by ``_REACT_HOOK_REGISTRY_LOCK``. The first concurrent call
on an agent registers four hooks under the fixed name
``_REACT_HOOK_NAME``; subsequent concurrent calls only bump the ref
count. Hooks are removed only after the **last** outstanding call on
that agent unwinds. This is required because AgentScope's hook
Comment on lines +34 to +39
registry is per-instance, and we must avoid both (a) registering the
same hook twice (which would double-fire ``_reasoning`` callbacks) and
(b) removing hooks while a sibling concurrent call still depends on
them.

Stacked ``ChatModelBase`` / ``AgentBase`` implementations (e.g. proxies where
each layer subclasses the base and ``__call__`` forwards to an inner model or
Expand All @@ -35,11 +53,12 @@

import contextvars
import logging
import threading
import timeit
import uuid
import weakref
from dataclasses import dataclass, field
from functools import wraps
from typing import Any, AsyncGenerator
from typing import Any, AsyncGenerator, MutableMapping

from opentelemetry.context import get_current as _get_current_context
from opentelemetry.util.genai.extended_handler import ExtendedTelemetryHandler
Expand All @@ -56,7 +75,12 @@

logger = logging.getLogger(__name__)

_REACT_STEP_HOOK_PREFIX = "otel_react_step"
# Fixed hook name shared by all concurrent invocations on a given agent.
# Using a stable name (rather than a per-call uuid) lets AgentScope's own
# registry naturally deduplicate, so a single set of callbacks fires for
# each ``_reasoning`` / ``_acting`` regardless of how many concurrent
# ``__call__`` invocations are in flight on the same instance.
_REACT_HOOK_NAME = "otel_react_step"

# Per-async-task nesting for stacked __call__ (proxy / decorator chains).
_CHAT_MODEL_CALL_DEPTH = contextvars.ContextVar(
Expand All @@ -68,6 +92,32 @@
default=0,
)

# Per-call ReAct step state. Stored in a ContextVar (not on the agent
# instance) so concurrent ``await agent(...)`` calls on the same instance
# observe isolated state. Hooks read via ``_REACT_STATE.get()`` and no-op
# when unset (i.e. fired in a context that did not opt into instrumentation).
_REACT_STATE: contextvars.ContextVar["_ReactStepState | None"] = (
contextvars.ContextVar(
"opentelemetry_agentscope_react_state",
default=None,
)
)

# Per-agent ``outstanding __call__`` counter for ReAct hook lifetime
# management. Hooks are registered exactly once on the first concurrent
# call and removed only when the last outstanding call unwinds; this
# avoids both double-firing (multiple registrations under the same name)
# and premature removal while sibling calls still depend on the hooks.
#
# A ``WeakKeyDictionary`` is used (rather than ``id(agent) -> int``) so
# that an agent that is garbage-collected without a paired ``release``
# (e.g. wrapper crashed between ``acquire`` and the protected region)
# automatically drops its registry entry, preventing a different agent
# from later being mis-recognised as already instrumented just because
# CPython recycled its memory address.
_REACT_HOOK_REGISTRY_LOCK = threading.Lock()
_REACT_HOOK_REGISTRY: MutableMapping[Any, int] = weakref.WeakKeyDictionary()


def _is_react_agent(agent_instance: Any) -> bool:
"""Check if an agent instance is a ReAct agent by duck-typing."""
Expand All @@ -76,17 +126,14 @@ def _is_react_agent(agent_instance: Any) -> bool:

@dataclass
class _ReactStepState:
"""Per-agent-call state for React step span lifecycle.
"""Per-call state for React step span lifecycle.

This object is stored on the agent instance only while a single
``AgentBase.__call__`` is in progress. It is not safe for concurrent
overlapping ``__call__`` on the same instance (same assumption as
AgentScope's single ``_reply_task`` field).
Lives in the ``_REACT_STATE`` ``ContextVar`` rather than on the agent
instance, so concurrent ``__call__`` invocations on the same agent
each see their own state and never overwrite each other's
``active_step`` / round counter / pending acting count.
"""

hook_name: str = field(
default_factory=lambda: f"{_REACT_STEP_HOOK_PREFIX}_{uuid.uuid4().hex[:8]}"
)
react_round: int = 0
active_step: ReactStepInvocation | None = None
original_context: Any = field(default=None)
Expand All @@ -110,9 +157,7 @@ def _make_pre_reasoning_hook(
"""

def hook(agent_self: Any, kwargs: dict) -> None:
state: _ReactStepState | None = getattr(
agent_self, "_react_step_state", None
)
state = _REACT_STATE.get()
if state is None:
return None

Expand Down Expand Up @@ -146,9 +191,7 @@ def _make_post_reasoning_hook(
"""

def hook(agent_self: Any, kwargs: dict, output: Any) -> None:
state: _ReactStepState | None = getattr(
agent_self, "_react_step_state", None
)
state = _REACT_STATE.get()
if state is None:
return None
try:
Expand All @@ -173,9 +216,7 @@ def _make_pre_acting_hook() -> Any:
"""Track nested _acting wrappers (subclass calls ``super()._acting``)."""

def hook(agent_self: Any, kwargs: dict) -> None:
state: _ReactStepState | None = getattr(
agent_self, "_react_step_state", None
)
state = _REACT_STATE.get()
if state is None:
return None
state.acting_nesting += 1
Expand All @@ -194,9 +235,7 @@ def _make_post_acting_hook(
"""

def hook(agent_self: Any, kwargs: dict, output: Any) -> None:
state: _ReactStepState | None = getattr(
agent_self, "_react_step_state", None
)
state = _REACT_STATE.get()
if state is None:
return None
try:
Expand All @@ -214,44 +253,112 @@ def hook(agent_self: Any, kwargs: dict, output: Any) -> None:
return hook


def _register_react_hooks(
agent: Any, state: _ReactStepState, handler: ExtendedTelemetryHandler
_REACT_HOOK_TYPES = (
"pre_reasoning",
"post_reasoning",
"pre_acting",
"post_acting",
)


def _acquire_react_hooks(
agent: Any, handler: ExtendedTelemetryHandler
) -> None:
"""Register React step tracking hooks on an agent instance."""
agent.register_instance_hook(
"pre_reasoning",
state.hook_name,
_make_pre_reasoning_hook(handler),
)
agent.register_instance_hook(
"post_reasoning",
state.hook_name,
_make_post_reasoning_hook(handler),
)
agent.register_instance_hook(
"pre_acting",
state.hook_name,
_make_pre_acting_hook(),
)
agent.register_instance_hook(
"post_acting",
state.hook_name,
_make_post_acting_hook(handler),
)
"""Register ReAct step hooks on ``agent`` if no concurrent call has
already done so, and bump the per-instance reference count.

Hooks are registered exactly once under the fixed name
``_REACT_HOOK_NAME``. Subsequent concurrent invocations only bump the
refcount; the actual ``register_instance_hook`` calls happen only
when the count transitions from 0 to 1. This avoids registering the
same hook twice (which would cause AgentScope to double-fire each
``_reasoning`` / ``_acting`` callback) while keeping the hooks alive
for as long as any in-flight call still needs them.

If a partial registration fails (e.g. AgentScope rejects the third of
four ``register_instance_hook`` calls), any hooks already installed
in this transition are rolled back and the refcount is left at zero
so the agent ends up in a consistent, un-instrumented state and the
exception is propagated to the caller.
"""
with _REACT_HOOK_REGISTRY_LOCK:
count = _REACT_HOOK_REGISTRY.get(agent, 0)
if count == 0:
registrations = (
("pre_reasoning", _make_pre_reasoning_hook(handler)),
("post_reasoning", _make_post_reasoning_hook(handler)),
("pre_acting", _make_pre_acting_hook()),
("post_acting", _make_post_acting_hook(handler)),
)
installed: list = []
try:
for hook_type, hook_fn in registrations:
agent.register_instance_hook(
hook_type, _REACT_HOOK_NAME, hook_fn
)
installed.append(hook_type)
except Exception:
# Roll back any partially installed hooks so the agent is
# left exactly as we found it; the refcount stays at 0
# because we never bumped it past this point.
for hook_type in installed:
try:
agent.remove_instance_hook(
hook_type, _REACT_HOOK_NAME
)
except Exception:
logger.warning(
"AgentScope instrumentation: failed to roll "
"back %s hook on %s during acquire failure",
hook_type,
type(agent).__name__,
)
raise
_REACT_HOOK_REGISTRY[agent] = count + 1


def _remove_react_hooks(agent: Any, state: _ReactStepState) -> None:
"""Remove React step tracking hooks from an agent instance."""
for hook_type in (
"pre_reasoning",
"post_reasoning",
"pre_acting",
"post_acting",
):
try:
agent.remove_instance_hook(hook_type, state.hook_name)
except (ValueError, KeyError):
pass
def _release_react_hooks(agent: Any) -> None:
"""Decrement the ref count and remove hooks when the last in-flight
call on ``agent`` unwinds.

Hooks must not be removed while sibling concurrent invocations are
still running, otherwise their ``_reasoning`` / ``_acting`` callbacks
would silently stop firing mid-flight.

Releases without a paired acquire (``count`` already 0) are silently
tolerated rather than raising: the only way to reach this state is
for a caller's ``try/finally`` to fire even though the matching
``_acquire_react_hooks`` aborted before bumping the refcount, in
which case there is simply nothing to clean up.
"""
with _REACT_HOOK_REGISTRY_LOCK:
current = _REACT_HOOK_REGISTRY.get(agent, 0)
if current <= 0:
return
count = current - 1
if count <= 0:
for hook_type in _REACT_HOOK_TYPES:
try:
agent.remove_instance_hook(
hook_type, _REACT_HOOK_NAME
)
except (ValueError, KeyError):
# AgentScope already lost the hook (e.g. user code
# called remove_instance_hook directly or rebuilt the
# registry). Log and continue: leaving the refcount
# entry around is worse than a noisy log line.
logger.warning(
"AgentScope instrumentation: %s hook missing "
"on %s during release; continuing cleanup",
hook_type,
type(agent).__name__,
)
try:
del _REACT_HOOK_REGISTRY[agent]
except KeyError:
pass
else:
_REACT_HOOK_REGISTRY[agent] = count


class AgentScopeChatModelWrapper:
Expand Down Expand Up @@ -473,15 +580,18 @@ async def async_wrapped_call(

is_react = _is_react_agent(call_self)
state: _ReactStepState | None = None
state_token = None
if is_react:
# Single slot on the instance: safe only when this __call__
# does not overlap another on the same agent (AgentScope
# uses the same pattern for _reply_task).
# Per-call state lives in a ContextVar so concurrent
# ``__call__`` invocations on the same agent each see
# an independent ``_ReactStepState``. Hooks fire under
# a single shared registration on the instance and
# dispatch to the active state via ``_REACT_STATE``.
state = _ReactStepState(
original_context=_get_current_context(),
)
call_self._react_step_state = state
_register_react_hooks(call_self, state, self._handler)
state_token = _REACT_STATE.set(state)
_acquire_react_hooks(call_self, self._handler)

try:
result = await original_call(
Expand Down Expand Up @@ -519,10 +629,10 @@ async def async_wrapped_call(
raise

finally:
if is_react and state:
_remove_react_hooks(call_self, state)
if hasattr(call_self, "_react_step_state"):
del call_self._react_step_state
if is_react:
_release_react_hooks(call_self)
if state_token is not None:
_REACT_STATE.reset(state_token)

except Exception as e:
logger.exception("Error in agent instrumentation: %s", e)
Expand Down
Loading
Loading