Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- **Multi-agent**: propagate trace context to child CoPaw processes via
``execute_shell_command``; suppress duplicate entry span in child
([#164](https://github.com/alibaba/loongsuite-python-agent/pull/164))

## Version 0.4.0 (2026-04-03)

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,35 @@ LLM call inside the agent.
Calls to models, tools, and other AgentScope primitives are **not** duplicated
here: use AgentScope (and your existing model client) instrumentations alongside
this package so they appear as child spans under this entry when configured.

## Sub-agent CLI and trace continuity (`multi_agent_collaboration`)

When a parent CoPaw agent runs a **child** CoPaw process via AgentScope’s
`execute_shell_command` (for example `copaw agents chat`), the default
subprocess inherits `os.environ` only and the trace would **break** across
processes.

This package also wraps `agentscope.tool._coding._shell.execute_shell_command`.
For commands whose string contains **`copaw`**, **`agents`**, and **`chat`**, it:

1. Merges the current trace context into the subprocess `env` (W3C
`TRACEPARENT` / `TRACESTATE` and any fields from your configured global
propagators, using the same uppercase-env convention as OpenTelemetry’s
[environment carrier](https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-api/src/opentelemetry/propagators/_envcarrier.py)).
2. Sets **`COPAW_OTEL_CHILD_AGENT=1`** so the child process can recognize the
call as a linked sub-agent.

In the child process, `AgentRunner.query_handler` **does not** create
`enter_ai_application_system`. It only `attach`es the extracted parent
context so AgentScope (and other) spans continue **in the same trace** as the
parent. Configure **`OTEL_PROPAGATORS`** to include `baggage` if you rely on
`session_id` / `user_id` baggage from the parent entry across this boundary.

Advanced: set **`COPAW_OTEL_INJECT_SHELL_TRACE=1`** to inject context for
**every** `execute_shell_command` invocation (still sets
`COPAW_OTEL_CHILD_AGENT=1`). Use only if **all** such children are CoPaw
agents that should suppress entry; otherwise unrelated shell children could
incorrectly skip their entry span.

The child CoPaw process must load this instrumentation (and OTel export
configuration) the same way as the parent for spans to export correctly.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
LoongSuite CoPaw instrumentation (``copaw >= 0.1.0``).

Instruments ``AgentRunner.query_handler`` with ``ExtendedTelemetryHandler.entry``
(``enter_ai_application_system``). Agent / tool / LLM spans come from AgentScope
and other instrumentations.
(``enter_ai_application_system``), and when AgentScope is present,
``execute_shell_command`` so child ``copaw agents chat`` subprocesses receive
trace context and optional entry suppression (``COPAW_OTEL_CHILD_AGENT``).
Agent / tool / LLM spans otherwise come from AgentScope and other instrumentations.

Usage
-----
Expand All @@ -37,6 +39,10 @@

from wrapt import wrap_function_wrapper

from opentelemetry.instrumentation.copaw._shell_patch import (
_MODULE_SHELL,
make_execute_shell_command_wrapper,
)
from opentelemetry.instrumentation.copaw.package import _instruments
from opentelemetry.instrumentation.copaw.patch import (
_MODULE_RUNNER,
Expand All @@ -57,6 +63,7 @@ class CoPawInstrumentor(BaseInstrumentor):
def __init__(self) -> None:
super().__init__()
self._handler: ExtendedTelemetryHandler | None = None
self._shell_command_wrapped = False

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
Expand All @@ -79,9 +86,36 @@ def _instrument(self, **kwargs: Any) -> None:
)
logger.debug("Instrumented CoPaw AgentRunner.query_handler")

try:
shell_wrapper = make_execute_shell_command_wrapper()
wrap_function_wrapper(
_MODULE_SHELL,
"execute_shell_command",
shell_wrapper,
)
self._shell_command_wrapped = True
logger.debug("Instrumented AgentScope execute_shell_command")
except ImportError:
logger.debug(
"agentscope.tool._coding._shell not importable; "
"skipping execute_shell_command hook"
)

def _uninstrument(self, **kwargs: Any) -> None:
del kwargs
self._handler = None
if self._shell_command_wrapped:
self._shell_command_wrapped = False
try:
import agentscope.tool._coding._shell as shell_module # noqa: PLC0415

unwrap(shell_module, "execute_shell_command")
logger.debug("Uninstrumented AgentScope execute_shell_command")
except Exception as exc:
logger.warning(
"Failed to uninstrument execute_shell_command: %s",
exc,
)
try:
import copaw.app.runner.runner as runner_module # noqa: PLC0415

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Shared constants for CoPaw subprocess trace linking."""

from __future__ import annotations

import os

# Set on the child process environment when the parent spawns ``copaw agents chat``
# (or when ``COPAW_OTEL_INJECT_SHELL_TRACE`` forces injection). The CoPaw
# entry wrapper skips ``enter_ai_application_system`` when this is "1".
COPAW_OTEL_CHILD_AGENT = "COPAW_OTEL_CHILD_AGENT"

# When set to a truthy value, inject trace context into every shell command
# (still sets COPAW_OTEL_CHILD_AGENT — use only if all such children are CoPaw
# agents that should suppress Entry).
COPAW_OTEL_INJECT_SHELL_TRACE = "COPAW_OTEL_INJECT_SHELL_TRACE"


def is_copaw_child_agent_process() -> bool:
return os.environ.get(COPAW_OTEL_CHILD_AGENT) == "1"
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Environment variable carrier for trace context (vendored from upstream OTel API).

The published ``opentelemetry-api`` wheels may not yet ship
``opentelemetry.propagators._envcarrier``; this module mirrors that
implementation so subprocess ``env=`` injection works consistently.
See: https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-api/src/opentelemetry/propagators/_envcarrier.py
"""

from __future__ import annotations

import os
from collections.abc import MutableMapping
from typing import Dict, Iterable, List, Mapping, Optional

from opentelemetry.propagators.textmap import Getter, Setter


class EnvironmentGetter(Getter[Mapping[str, str]]):
"""Getter for extracting context from a snapshot of ``os.environ``."""

def __init__(self) -> None:
self.carrier: Dict[str, str] = {
k.lower(): v for k, v in os.environ.items()
}

def get(self, carrier: Mapping[str, str], key: str) -> Optional[List[str]]:
del carrier # interface compatibility
val = self.carrier.get(key.lower())
if val is None:
return None
if isinstance(val, Iterable) and not isinstance(val, str):
return list(val)
return [val]

def keys(self, carrier: Mapping[str, str]) -> List[str]:
del carrier
return list(self.carrier.keys())


class EnvironmentSetter(Setter[MutableMapping[str, str]]):
"""Setter for building an ``env`` dict (keys stored uppercase)."""

def set(
self, carrier: MutableMapping[str, str], key: str, value: str
) -> None:
carrier[key.upper()] = value
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Inject trace context into AgentScope ``execute_shell_command`` subprocess env."""

from __future__ import annotations

import asyncio
import logging
import os
from typing import Any, Callable

from opentelemetry import propagate

from ._constants import (
COPAW_OTEL_CHILD_AGENT,
COPAW_OTEL_INJECT_SHELL_TRACE,
)
from ._env_carrier import EnvironmentSetter

logger = logging.getLogger(__name__)

_MODULE_SHELL = "agentscope.tool._coding._shell"
_PATCH_TARGET = "execute_shell_command"


def _truthy_env(name: str) -> bool:
return os.environ.get(name, "").lower() in ("1", "true", "yes", "on")


def should_inject_trace_for_shell_command(command: str) -> bool:
"""Return True if trace env should be merged for this shell command."""
if _truthy_env(COPAW_OTEL_INJECT_SHELL_TRACE):
return True
c = command.lower()
return "copaw" in c and "agents" in c and "chat" in c


async def _run_shell_command_with_env(
command: str,
timeout: int,
env: dict[str, str],
) -> Any:
"""Same behavior as agentscope ``execute_shell_command``, with explicit *env*."""
from agentscope.message import TextBlock # noqa: PLC0415
from agentscope.tool._response import ToolResponse # noqa: PLC0415

proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
bufsize=0,
env=env,
)

try:
await asyncio.wait_for(proc.wait(), timeout=timeout)
stdout, stderr = await proc.communicate()
Comment on lines +68 to +69
stdout_str = stdout.decode("utf-8")
stderr_str = stderr.decode("utf-8")
returncode = proc.returncode

except asyncio.TimeoutError:
stderr_suffix = (
f"TimeoutError: The command execution exceeded "
f"the timeout of {timeout} seconds."
)
returncode = -1
try:
proc.terminate()
stdout, stderr = await proc.communicate()
stdout_str = stdout.decode("utf-8")
stderr_str = stderr.decode("utf-8")
if stderr_str:
stderr_str += f"\n{stderr_suffix}"
else:
stderr_str = stderr_suffix
except ProcessLookupError:
stdout_str = ""
stderr_str = stderr_suffix
Comment on lines +80 to +91

return ToolResponse(
content=[
TextBlock(
type="text",
text=(
f"<returncode>{returncode}</returncode>"
f"<stdout>{stdout_str}</stdout>"
f"<stderr>{stderr_str}</stderr>"
),
),
],
)


def _build_subprocess_env() -> dict[str, str]:
merged = os.environ.copy()
delta: dict[str, str] = {}
try:
propagate.get_global_textmap().inject(
delta, setter=EnvironmentSetter()
)
except Exception:
logger.debug("Failed to inject trace into env", exc_info=True)
return merged
merged.update(delta)
merged[COPAW_OTEL_CHILD_AGENT] = "1"
return merged


def make_execute_shell_command_wrapper() -> Callable[..., Any]:
"""Factory for ``wrapt`` wrapper around ``execute_shell_command``."""

async def execute_shell_command_wrapper(
wrapped: Any,
instance: Any,
args: Any,
kwargs: Any,
) -> Any:
del instance
command = ""
if args:
command = str(args[0])
elif kwargs.get("command") is not None:
command = str(kwargs["command"])

timeout = 300
if len(args) >= 2:
timeout = int(args[1])
elif "timeout" in kwargs:
timeout = int(kwargs["timeout"])

if not should_inject_trace_for_shell_command(command):
return await wrapped(*args, **kwargs)

env = _build_subprocess_env()
try:
return await _run_shell_command_with_env(command, timeout, env)
except Exception:
logger.debug(
"%s.%s inject path failed; falling back to original",
_MODULE_SHELL,
_PATCH_TARGET,
exc_info=True,
)
return await wrapped(*args, **kwargs)
Comment on lines +147 to +157

return execute_shell_command_wrapper
Loading
Loading