-
Notifications
You must be signed in to change notification settings - Fork 438
feat: subagent activity viewing #1513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
supreme-gg-gg
wants to merge
8
commits into
kagent-dev:main
Choose a base branch
from
supreme-gg-gg:jetc/feat/subagent-viewing
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+869
−320
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
b3435ba
feat: subagent viewing
supreme-gg-gg ede4a1e
subagent fixes
supreme-gg-gg 014c871
docs
supreme-gg-gg c9e1627
cleanup
supreme-gg-gg 59d684d
fix tests
supreme-gg-gg f963623
fix comments and rebase
supreme-gg-gg eec886e
docs and fix lint
supreme-gg-gg c3b5bcd
Merge branch 'main' into jetc/feat/subagent-viewing
supreme-gg-gg File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| # A2A Subagents | ||
|
|
||
| Kagent allows users to add subagents (other agents running on Kagent or remotely) as tools to a main agent, connected via the A2A protocol. This feature is enabled by `KAgentRemoteA2ATool` (`python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py`), kagent's custom replacement for the upstream `AgentTool(RemoteA2aAgent(...))` pairing. | ||
|
|
||
| It directly manages the A2A conversation with a remote subagent and adds three things the upstream lacks: HITL propagation, live activity viewing, and user ID forwarding. | ||
|
|
||
| See [human-in-the-loop.md](human-in-the-loop.md) for HITL details. | ||
|
|
||
| --- | ||
|
|
||
| ## How it works | ||
|
|
||
| Each parent A2A request creates a fresh `Runner` and fresh tool instances. `KAgentRemoteA2ATool.__init__` generates a UUID (`_last_context_id`) that is used as the A2A `context_id` for every message sent to the subagent. On the subagent side, this `context_id` becomes the session ID. | ||
|
|
||
| `run_async` has two phases: | ||
|
|
||
| - **Phase 1** (normal call): sends the request to the subagent and handles the response — returning the result, pausing for HITL if the subagent returns `input_required`, or returning an error string. | ||
| - **Phase 2** (HITL resume): reads the stored `task_id`/`context_id` from `tool_context.tool_confirmation.payload` and forwards the user's decision (approve / reject / batch / ask-user answers) to the subagent's pending task. | ||
|
|
||
| On success, `run_async` returns: | ||
| ```python | ||
| {"result": str, "subagent_session_id": str} # normal | ||
| {"result": str, "subagent_session_id": str, | ||
| "kagent_usage_metadata": dict} # with usage | ||
| {"status": "pending", "waiting_for": "subagent_approval", ...} # HITL pause | ||
| ``` | ||
|
|
||
| `KAgentRemoteA2AToolset` is a thin `BaseToolset` wrapper whose only job is ensuring the owned `httpx.AsyncClient` is closed when the runner shuts down — ADK's cleanup path only discovers `BaseToolset` instances, not bare `BaseTool` instances. | ||
|
|
||
| --- | ||
|
|
||
| ## User ID and session tagging | ||
|
|
||
| `_SubagentInterceptor` is registered on the A2A client at construction time and injects two headers on every outgoing request: | ||
|
|
||
| | Header | Value | Purpose | | ||
| |---|---|---| | ||
| | `x-user-id` | parent session's user ID | Scopes the subagent DB session to the same user | | ||
| | `x-kagent-source` | `"subagent"` | Hides the session from the agent's session history sidebar | | ||
|
|
||
| > Interceptors must be passed to `ClientFactory.create(interceptors=[...])` — `A2AClient.add_request_middleware()` appends to a list that the transport never reads. | ||
|
|
||
| On the subagent side, `KAgentRequestContextBuilder` reads these headers and passes them through to `_prepare_session`, which calls `KAgentSessionService.create_session()` with `source="subagent"`. The Go layer stores this in a `Source` column and excludes such sessions from `ListSessionsForAgent`. | ||
|
|
||
| --- | ||
|
|
||
| ## Live activity viewing | ||
|
|
||
| The UI can show what a subagent is doing in a live panel before it finishes. This works because the session ID is known before the tool runs: | ||
|
|
||
| Before the run loop, `A2aAgentExecutor` builds a `{tool_name → session_id}` map from all tools implementing the `SubagentSessionProvider` protocol (`subagent_session_id` property). The event converter stamps this as `kagent_subagent_session_id` metadata on each `function_call` DataPart as soon as the LLM emits the call. The UI reads it immediately and begins polling `/api/sessions/{id}` every 2 seconds, rendering the subagent's events as a nested chat thread. Nesting is capped at depth 3. | ||
|
|
||
| The map is keyed by tool name because within one parent request, all calls to the same subagent tool intentionally share one `context_id` — giving the subagent conversation continuity across sequential invocations. A fresh `context_id` is generated on the next parent request when the runner rebuilds. | ||
|
|
||
| When sending session requests to Go backend, take note that: | ||
|
|
||
| | Session query | Includes subagent sessions? | | ||
| |---|---| | ||
| | `GET /api/sessions/agent/{ns}/{name}` | No — filtered by `source != 'subagent'` | | ||
| | `GET /api/sessions/{id}` | Yes | | ||
| | `GET /api/sessions/{id}/tasks` | Yes | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,7 +13,7 @@ | |
|
|
||
| import logging | ||
| import uuid | ||
| from typing import Any, Optional | ||
| from typing import Any, Optional, Protocol, runtime_checkable | ||
| from urllib.parse import urlparse | ||
|
|
||
| import httpx | ||
|
|
@@ -22,6 +22,7 @@ | |
| from a2a.client.client import ClientConfig as A2AClientConfig | ||
| from a2a.client.client_factory import ClientFactory as A2AClientFactory | ||
| from a2a.client.errors import A2AClientHTTPError | ||
| from a2a.client.middleware import ClientCallContext, ClientCallInterceptor | ||
| from a2a.types import ( | ||
| AgentCard, | ||
| DataPart, | ||
|
|
@@ -55,6 +56,27 @@ | |
|
|
||
| logger = logging.getLogger("kagent_adk." + __name__) | ||
|
|
||
| _USER_ID_CONTEXT_KEY = "x-user-id" | ||
| _SOURCE_HEADER = "x-kagent-source" | ||
| _SOURCE_SUBAGENT = "subagent" | ||
|
|
||
|
|
||
| class _SubagentInterceptor(ClientCallInterceptor): | ||
| """ | ||
| Injects the authenticated user's ID as an ``x-user-id`` HTTP header and | ||
| marks the request as originating from a subagent call via | ||
| ``x-kagent-source: subagent`` on every outgoing A2A request. | ||
| """ | ||
|
|
||
| async def intercept(self, method_name, request_payload, http_kwargs, agent_card, context): | ||
| headers = dict(http_kwargs.get("headers", {})) | ||
| # Always mark requests from a parent agent tool as subagent-originated | ||
| headers[_SOURCE_HEADER] = _SOURCE_SUBAGENT | ||
| if context and _USER_ID_CONTEXT_KEY in context.state: | ||
| headers["x-user-id"] = context.state[_USER_ID_CONTEXT_KEY] | ||
| http_kwargs["headers"] = headers | ||
| return request_payload, http_kwargs | ||
|
|
||
|
|
||
| def _extract_text_from_task(task: Task) -> str: | ||
| """Extract text content from a completed task's artifacts or status message.""" | ||
|
|
@@ -98,6 +120,17 @@ def _extract_usage_from_task(task: Task) -> Optional[dict]: | |
| return None | ||
|
|
||
|
|
||
| @runtime_checkable | ||
| class SubagentSessionProvider(Protocol): | ||
| """Protocol for tools that delegate to a subagent and can expose | ||
| the subagent's session ID for live activity polling.""" | ||
|
|
||
| name: str | ||
|
|
||
| @property | ||
| def subagent_session_id(self) -> str | None: ... | ||
|
|
||
|
|
||
| class KAgentRemoteA2ATool(BaseTool): | ||
| """A tool that calls a remote A2A agent and propagates HITL state.""" | ||
|
|
||
|
|
@@ -114,8 +147,13 @@ def __init__( | |
| self._httpx_client = httpx_client | ||
| self._a2a_client: Optional[A2AClient] = None | ||
| self._agent_card: Optional[AgentCard] = None | ||
| # Track the context_id from the remote agent for session continuity | ||
| self._last_context_id: Optional[str] = None | ||
| # Pre-generate context_id for UI session polling | ||
| self._last_context_id: str = str(uuid.uuid4()) | ||
|
|
||
| @property | ||
| def subagent_session_id(self) -> str | None: | ||
| """The subagent's session ID (== context_id sent in the A2A message).""" | ||
| return self._last_context_id | ||
|
Comment on lines
+150
to
+156
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tool instances are created fresh per request and destroyed after, so this |
||
|
|
||
| async def _ensure_client(self) -> A2AClient: | ||
| """Lazily resolve the agent card and initialize the A2A client.""" | ||
|
|
@@ -141,15 +179,18 @@ async def _ensure_client(self) -> A2AClient: | |
| if not self.description and self._agent_card.description: | ||
| self.description = self._agent_card.description | ||
|
|
||
| # Create the A2A client | ||
| # Create the A2A client. | ||
| config = A2AClientConfig( | ||
| httpx_client=self._httpx_client, | ||
| streaming=False, | ||
| polling=False, | ||
| supported_transports=[A2ATransport.jsonrpc], | ||
| ) | ||
| factory = A2AClientFactory(config=config) | ||
| self._a2a_client = factory.create(self._agent_card) | ||
| self._a2a_client = factory.create( | ||
| self._agent_card, | ||
| interceptors=[_SubagentInterceptor()], | ||
| ) | ||
| return self._a2a_client | ||
|
|
||
| def _get_declaration(self) -> genai_types.FunctionDeclaration: | ||
|
|
@@ -197,16 +238,17 @@ async def _handle_first_call(self, args: dict[str, Any], tool_context: ToolConte | |
| context_id=self._last_context_id, | ||
| ) | ||
|
|
||
| # Forward the authenticated user ID so the subagent session is scoped | ||
| # to the same user as the parent agent session. | ||
| call_context = ClientCallContext(state={_USER_ID_CONTEXT_KEY: tool_context.session.user_id}) | ||
|
|
||
| task: Optional[Task] = None | ||
| try: | ||
| async for response in client.send_message(request=message): | ||
| async for response in client.send_message(request=message, context=call_context): | ||
| if isinstance(response, tuple): | ||
| # ClientEvent: (Task, UpdateEvent | None) | ||
| task = response[0] | ||
| elif isinstance(response, A2AMessage): | ||
| # Direct message response (no task management) | ||
| if response.context_id: | ||
| self._last_context_id = response.context_id | ||
| return self._extract_text_from_message(response) | ||
| except A2AClientHTTPError as e: | ||
| return f"Remote agent '{self.name}' request failed: {e}" | ||
|
|
@@ -217,10 +259,6 @@ async def _handle_first_call(self, args: dict[str, Any], tool_context: ToolConte | |
| if task is None: | ||
| return f"Remote agent '{self.name}' returned no result." | ||
|
|
||
| # Track context_id for future requests to the same remote agent | ||
| if task.context_id: | ||
| self._last_context_id = task.context_id | ||
|
|
||
| state = task.status.state if task.status else None | ||
|
|
||
| if state == TaskState.input_required: | ||
|
|
@@ -235,8 +273,8 @@ async def _handle_first_call(self, args: dict[str, Any], tool_context: ToolConte | |
| result_text = _extract_text_from_task(task) | ||
| usage = _extract_usage_from_task(task) | ||
| if usage: | ||
| return {"result": result_text, "kagent_usage_metadata": usage} | ||
| return result_text or "" | ||
| return {"result": result_text, "kagent_usage_metadata": usage, "subagent_session_id": self._last_context_id} | ||
| return {"result": result_text or "", "subagent_session_id": self._last_context_id} | ||
|
|
||
| def _handle_input_required(self, task: Task, tool_context: ToolContext) -> dict[str, Any]: | ||
| """Handle a subagent that returned input_required (HITL). | ||
|
|
@@ -344,9 +382,10 @@ async def _handle_resume(self, tool_context: ToolContext) -> Any: | |
| ) | ||
|
|
||
| client = await self._ensure_client() | ||
| call_context = ClientCallContext(state={_USER_ID_CONTEXT_KEY: tool_context.session.user_id}) | ||
| task: Optional[Task] = None | ||
| try: | ||
| async for response in client.send_message(request=decision_message): | ||
| async for response in client.send_message(request=decision_message, context=call_context): | ||
| if isinstance(response, tuple): | ||
| task = response[0] | ||
| elif isinstance(response, A2AMessage): | ||
|
|
@@ -374,8 +413,13 @@ async def _handle_resume(self, tool_context: ToolContext) -> Any: | |
| result_text = _extract_text_from_task(task) | ||
| usage = _extract_usage_from_task(task) | ||
| if usage: | ||
| return {"result": result_text, "kagent_usage_metadata": usage} | ||
| return result_text or "" | ||
| return { | ||
| "result": result_text, | ||
| "kagent_usage_metadata": usage, | ||
| "subagent_session_id": context_id or self._last_context_id, | ||
| } | ||
| # context_id from the confirmation payload is the original subagent session ID in case of interrupts | ||
| return {"result": result_text, "subagent_session_id": context_id or self._last_context_id} | ||
|
|
||
| @staticmethod | ||
| def _extract_text_from_message(message: A2AMessage) -> str: | ||
|
|
@@ -416,6 +460,15 @@ def __init__( | |
| httpx_client=httpx_client, | ||
| ) | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| return self._tool.name | ||
|
|
||
| @property | ||
| def subagent_session_id(self) -> str | None: | ||
| """The subagent's session ID (== context_id sent in the A2A message).""" | ||
| return self._tool.subagent_session_id | ||
|
|
||
| async def get_tools(self, readonly_context: Optional[ReadonlyContext] = None) -> list[BaseTool]: | ||
| return [self._tool] | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like an edge case, but when combined with the behaviour of the above comment, it's actually an intended feature. So the behaviour overall is:
User -> Main agent -> Subagent (multiple / parallel / sequential) -> they go into the same session for that subagent, the UI shows the same session -> each subagent return a response (existing behaviour) -> main agent
Now when user does another invocation of main agent:
User -> Main agent -> Runner gets recreated (existing behaviour) ->
KagentRemoteA2AToolgets reset + session ID reset -> subagent (new session) -> ...Because within each invocation the session ID for the same subagent is the same, therefore stamping the data part with the same ID mapped by name is not a problem. 😄
This is intended because when the main agent invokes the subagent sequentially, it's because it want to continue the conversation and the flow here is designed to support this.