From b3435ba7eead1909ec7c7fd0677da9bcca6b95f6 Mon Sep 17 00:00:00 2001 From: Jet Chiang Date: Fri, 13 Mar 2026 13:38:04 -0400 Subject: [PATCH 1/8] feat: subagent viewing Signed-off-by: Jet Chiang userid propagation to subagents Signed-off-by: Jet Chiang subagents Signed-off-by: Jet Chiang --- go/api/database/models.go | 3 + go/api/httpapi/types.go | 2 + go/core/internal/database/client.go | 13 +- go/core/internal/database/fake/client.go | 6 +- .../internal/httpserver/handlers/sessions.go | 1 + .../src/kagent/adk/_agent_executor.py | 24 ++- .../src/kagent/adk/_remote_a2a_tool.py | 92 ++++++++--- .../src/kagent/adk/_session_service.py | 2 + .../kagent/adk/converters/event_converter.py | 38 ++++- .../src/kagent/core/a2a/_requests.py | 7 +- ui/src/app/actions/sessions.ts | 32 ++++ ui/src/components/chat/AgentCallDisplay.tsx | 143 +++++++++++++++++- ui/src/components/chat/ToolCallDisplay.tsx | 33 +++- ui/src/lib/messageHandlers.ts | 62 ++++++-- 14 files changed, 411 insertions(+), 47 deletions(-) diff --git a/go/api/database/models.go b/go/api/database/models.go index b7cfd80b2..16bb1a007 100644 --- a/go/api/database/models.go +++ b/go/api/database/models.go @@ -66,6 +66,9 @@ type Session struct { DeletedAt gorm.DeletedAt `gorm:"index" json:"deleted_at"` AgentID *string `gorm:"index" json:"agent_id"` + // Source indicates how this session was created. + // nil or empty = user-initiated, "subagent" = created by a parent agent's A2A call. + Source *string `gorm:"index" json:"source,omitempty"` } type Task struct { diff --git a/go/api/httpapi/types.go b/go/api/httpapi/types.go index 8679cc909..a657be0e2 100644 --- a/go/api/httpapi/types.go +++ b/go/api/httpapi/types.go @@ -103,6 +103,8 @@ type SessionRequest struct { AgentRef *string `json:"agent_ref,omitempty"` Name *string `json:"name,omitempty"` ID *string `json:"id,omitempty"` + // Source indicates how this session was created (e.g. "subagent"). + Source *string `json:"source,omitempty"` } // Run types diff --git a/go/core/internal/database/client.go b/go/core/internal/database/client.go index 7ae10078f..62d20215e 100644 --- a/go/core/internal/database/client.go +++ b/go/core/internal/database/client.go @@ -162,9 +162,16 @@ func (c *clientImpl) ListTasksForSession(ctx context.Context, sessionID string) } func (c *clientImpl) ListSessionsForAgent(ctx context.Context, agentID string, userID string) ([]dbpkg.Session, error) { - return list[dbpkg.Session](c.db.WithContext(ctx), - Clause{Key: "agent_id", Value: agentID}, - Clause{Key: "user_id", Value: userID}) + var sessions []dbpkg.Session + err := c.db.WithContext(ctx). + Where("agent_id = ? AND user_id = ?", agentID, userID). + Where("source IS NULL OR source != ?", "subagent"). + Order("created_at ASC"). + Find(&sessions).Error + if err != nil { + return nil, fmt.Errorf("failed to list sessions for agent: %w", err) + } + return sessions, nil } // ListSessions lists all sessions for a user diff --git a/go/core/internal/database/fake/client.go b/go/core/internal/database/fake/client.go index 57b69874c..658f6d889 100644 --- a/go/core/internal/database/fake/client.go +++ b/go/core/internal/database/fake/client.go @@ -336,7 +336,7 @@ func (c *InMemoryFakeClient) ListSessions(_ context.Context, userID string) ([]d return result, nil } -// ListSessionsForAgent lists all sessions for an agent +// ListSessionsForAgent lists all sessions for an agent, excluding subagent sessions. func (c *InMemoryFakeClient) ListSessionsForAgent(_ context.Context, agentID string, userID string) ([]database.Session, error) { c.mu.RLock() defer c.mu.RUnlock() @@ -344,6 +344,10 @@ func (c *InMemoryFakeClient) ListSessionsForAgent(_ context.Context, agentID str var result []database.Session for _, session := range c.sessions { if session.AgentID != nil && *session.AgentID == agentID && session.UserID == userID { + // Exclude subagent sessions from the listing + if session.Source != nil && *session.Source == "subagent" { + continue + } result = append(result, *session) } } diff --git a/go/core/internal/httpserver/handlers/sessions.go b/go/core/internal/httpserver/handlers/sessions.go index a6bda8955..eb26e6714 100644 --- a/go/core/internal/httpserver/handlers/sessions.go +++ b/go/core/internal/httpserver/handlers/sessions.go @@ -136,6 +136,7 @@ func (h *SessionsHandler) HandleCreateSession(w ErrorResponseWriter, r *http.Req Name: sessionRequest.Name, UserID: userID, AgentID: &agent.ID, + Source: sessionRequest.Source, } log.V(1).Info("Creating session in database", diff --git a/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py b/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py index 034c51638..ac29ad9f9 100644 --- a/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py +++ b/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py @@ -54,6 +54,7 @@ from ._mcp_toolset import is_anyio_cross_task_cancel_scope_error from .converters.event_converter import convert_event_to_a2a_events, serialize_metadata_value +from ._remote_a2a_tool import SubagentSessionProvider from .converters.part_converter import convert_a2a_part_to_genai_part, convert_genai_part_to_a2a_part from .converters.request_converter import convert_a2a_request_to_adk_run_args @@ -587,6 +588,13 @@ async def _handle_request( real_invocation_id: str | None = None last_usage_metadata = None + # Build a mapping of tool name -> subagent session ID once so the + # event converter can stamp it onto function_call DataParts. + subagent_session_ids: dict[str, str] = {} + for tool in getattr(runner.agent, "tools", None) or []: + if isinstance(tool, SubagentSessionProvider) and tool.subagent_session_id: + subagent_session_ids[tool.name] = tool.subagent_session_id + task_result_aggregator = TaskResultAggregator() async with Aclosing(runner.run_async(**run_args)) as agen: async for adk_event in agen: @@ -603,7 +611,11 @@ async def _handle_request( last_usage_metadata = adk_event.usage_metadata for a2a_event in convert_event_to_a2a_events( - adk_event, invocation_context, context.task_id, context.context_id + adk_event, + invocation_context, + context.task_id, + context.context_id, + subagent_session_ids=subagent_session_ids or None, ): # Only aggregate non-partial events to avoid duplicates from streaming chunks # Partial events are sent to frontend for display but not accumulated @@ -691,10 +703,18 @@ async def _prepare_session(self, context: RequestContext, run_args: dict[str, An session_name = text[:20] + ("..." if len(text) > 20 else "") break + state: dict[str, Any] = {"session_name": session_name} + # Propagate source (e.g. "subagent") so the session is tagged in the DB. + source = None + if context.call_context and context.call_context.state: + source = context.call_context.state.get("kagent_source") + if source: + state["source"] = source + session = await runner.session_service.create_session( app_name=runner.app_name, user_id=user_id, - state={"session_name": session_name}, + state=state, session_id=session_id, ) diff --git a/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py b/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py index b3a2eb8ae..51c59b70c 100644 --- a/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py +++ b/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py @@ -13,7 +13,7 @@ import logging import uuid -from typing import Any, Optional +from typing import Any, Optional, Protocol, runtime_checkable from urllib.parse import urlparse import httpx @@ -22,6 +22,7 @@ from a2a.client.client import ClientConfig as A2AClientConfig from a2a.client.client_factory import ClientFactory as A2AClientFactory from a2a.client.errors import A2AClientHTTPError +from a2a.client.middleware import ClientCallContext, ClientCallInterceptor from a2a.types import ( AgentCard, DataPart, @@ -55,6 +56,27 @@ logger = logging.getLogger("kagent_adk." + __name__) +_USER_ID_CONTEXT_KEY = "x-user-id" +_SOURCE_HEADER = "x-kagent-source" +_SOURCE_SUBAGENT = "subagent" + + +class _SubagentInterceptor(ClientCallInterceptor): + """ + Injects the authenticated user's ID as an ``x-user-id`` HTTP header and + marks the request as originating from a subagent call via + ``x-kagent-source: subagent`` on every outgoing A2A request. + """ + + async def intercept(self, method_name, request_payload, http_kwargs, agent_card, context): + headers = dict(http_kwargs.get("headers", {})) + # Always mark requests from a parent agent tool as subagent-originated + headers[_SOURCE_HEADER] = _SOURCE_SUBAGENT + if context and _USER_ID_CONTEXT_KEY in context.state: + headers["x-user-id"] = context.state[_USER_ID_CONTEXT_KEY] + http_kwargs["headers"] = headers + return request_payload, http_kwargs + def _extract_text_from_task(task: Task) -> str: """Extract text content from a completed task's artifacts or status message.""" @@ -98,6 +120,17 @@ def _extract_usage_from_task(task: Task) -> Optional[dict]: return None +@runtime_checkable +class SubagentSessionProvider(Protocol): + """Protocol for tools that delegate to a subagent and can expose + the subagent's session ID for live activity polling.""" + + name: str + + @property + def subagent_session_id(self) -> str | None: ... + + class KAgentRemoteA2ATool(BaseTool): """A tool that calls a remote A2A agent and propagates HITL state.""" @@ -114,8 +147,16 @@ def __init__( self._httpx_client = httpx_client self._a2a_client: Optional[A2AClient] = None self._agent_card: Optional[AgentCard] = None - # Track the context_id from the remote agent for session continuity - self._last_context_id: Optional[str] = None + # Pre-generate context_id so the event converter can stamp it onto + # function_call events before the tool runs. The subagent uses + # context_id as its session_id, so knowing it early lets the UI + # start polling the subagent session during execution. + self._last_context_id: str = str(uuid.uuid4()) + + @property + def subagent_session_id(self) -> str | None: + """The subagent's session ID (== context_id sent in the A2A message).""" + return self._last_context_id async def _ensure_client(self) -> A2AClient: """Lazily resolve the agent card and initialize the A2A client.""" @@ -141,7 +182,12 @@ async def _ensure_client(self) -> A2AClient: if not self.description and self._agent_card.description: self.description = self._agent_card.description - # Create the A2A client + # Create the A2A client. + # IMPORTANT: pass the interceptor at construction time so it is + # registered on the transport's interceptor list. The SDK's + # add_request_middleware() only appends to Client._middleware which + # is never read by the transport — so interceptors added after + # construction are silently ignored. config = A2AClientConfig( httpx_client=self._httpx_client, streaming=False, @@ -149,7 +195,10 @@ async def _ensure_client(self) -> A2AClient: supported_transports=[A2ATransport.jsonrpc], ) factory = A2AClientFactory(config=config) - self._a2a_client = factory.create(self._agent_card) + self._a2a_client = factory.create( + self._agent_card, + interceptors=[_SubagentInterceptor()], + ) return self._a2a_client def _get_declaration(self) -> genai_types.FunctionDeclaration: @@ -197,16 +246,17 @@ async def _handle_first_call(self, args: dict[str, Any], tool_context: ToolConte context_id=self._last_context_id, ) + # Forward the authenticated user ID so the subagent session is scoped + # to the same user as the parent agent session. + call_context = ClientCallContext(state={_USER_ID_CONTEXT_KEY: tool_context.session.user_id}) + task: Optional[Task] = None try: - async for response in client.send_message(request=message): + async for response in client.send_message(request=message, context=call_context): if isinstance(response, tuple): # ClientEvent: (Task, UpdateEvent | None) task = response[0] elif isinstance(response, A2AMessage): - # Direct message response (no task management) - if response.context_id: - self._last_context_id = response.context_id return self._extract_text_from_message(response) except A2AClientHTTPError as e: return f"Remote agent '{self.name}' request failed: {e}" @@ -217,10 +267,6 @@ async def _handle_first_call(self, args: dict[str, Any], tool_context: ToolConte if task is None: return f"Remote agent '{self.name}' returned no result." - # Track context_id for future requests to the same remote agent - if task.context_id: - self._last_context_id = task.context_id - state = task.status.state if task.status else None if state == TaskState.input_required: @@ -235,8 +281,8 @@ async def _handle_first_call(self, args: dict[str, Any], tool_context: ToolConte result_text = _extract_text_from_task(task) usage = _extract_usage_from_task(task) if usage: - return {"result": result_text, "kagent_usage_metadata": usage} - return result_text or "" + return {"result": result_text, "kagent_usage_metadata": usage, "subagent_session_id": self._last_context_id} + return {"result": result_text or "", "subagent_session_id": self._last_context_id} def _handle_input_required(self, task: Task, tool_context: ToolContext) -> dict[str, Any]: """Handle a subagent that returned input_required (HITL). @@ -344,9 +390,10 @@ async def _handle_resume(self, tool_context: ToolContext) -> Any: ) client = await self._ensure_client() + call_context = ClientCallContext(state={_USER_ID_CONTEXT_KEY: tool_context.session.user_id}) task: Optional[Task] = None try: - async for response in client.send_message(request=decision_message): + async for response in client.send_message(request=decision_message, context=call_context): if isinstance(response, tuple): task = response[0] elif isinstance(response, A2AMessage): @@ -374,8 +421,8 @@ async def _handle_resume(self, tool_context: ToolContext) -> Any: result_text = _extract_text_from_task(task) usage = _extract_usage_from_task(task) if usage: - return {"result": result_text, "kagent_usage_metadata": usage} - return result_text or "" + return {"result": result_text, "kagent_usage_metadata": usage, "subagent_session_id": self._last_context_id} + return {"result": result_text or "", "subagent_session_id": self._last_context_id} @staticmethod def _extract_text_from_message(message: A2AMessage) -> str: @@ -416,6 +463,15 @@ def __init__( httpx_client=httpx_client, ) + @property + def name(self) -> str: + return self._tool.name + + @property + def subagent_session_id(self) -> str | None: + """The subagent's session ID (== context_id sent in the A2A message).""" + return self._tool.subagent_session_id + async def get_tools(self, readonly_context: Optional[ReadonlyContext] = None) -> list[BaseTool]: return [self._tool] diff --git a/python/packages/kagent-adk/src/kagent/adk/_session_service.py b/python/packages/kagent-adk/src/kagent/adk/_session_service.py index a339cbb3f..da08895a5 100644 --- a/python/packages/kagent-adk/src/kagent/adk/_session_service.py +++ b/python/packages/kagent-adk/src/kagent/adk/_session_service.py @@ -42,6 +42,8 @@ async def create_session( request_data["id"] = session_id if state and state.get("session_name"): request_data["name"] = state.get("session_name", "") + if state and state.get("source"): + request_data["source"] = state.get("source", "") # Make API call to create session response = await self.client.post( diff --git a/python/packages/kagent-adk/src/kagent/adk/converters/event_converter.py b/python/packages/kagent-adk/src/kagent/adk/converters/event_converter.py index a8b7427b8..50432b645 100644 --- a/python/packages/kagent-adk/src/kagent/adk/converters/event_converter.py +++ b/python/packages/kagent-adk/src/kagent/adk/converters/event_converter.py @@ -134,8 +134,34 @@ def _process_long_running_tool(a2a_part: A2APart, event: Event) -> None: a2a_part.root.metadata[get_kagent_metadata_key(A2A_DATA_PART_METADATA_IS_LONG_RUNNING_KEY)] = True +def _process_subagent_session_id(a2a_part: A2APart, subagent_session_ids: Dict[str, str]) -> None: + """Stamps a subagent session ID onto a function_call DataPart. + + If the part is a function_call whose tool name appears in + ``subagent_session_ids``, the corresponding session ID is added to + the DataPart metadata so the UI can find the subagent session. + + Args: + a2a_part: The A2A part to potentially stamp. + subagent_session_ids: Mapping of tool name to pre-generated session ID. + """ + if not isinstance(a2a_part.root, DataPart) or not a2a_part.root.metadata: + return + if ( + a2a_part.root.metadata.get(get_kagent_metadata_key(A2A_DATA_PART_METADATA_TYPE_KEY)) + != A2A_DATA_PART_METADATA_TYPE_FUNCTION_CALL + ): + return + tool_name = a2a_part.root.data.get("name") if isinstance(a2a_part.root.data, dict) else None + if tool_name and tool_name in subagent_session_ids: + a2a_part.root.metadata[get_kagent_metadata_key("subagent_session_id")] = subagent_session_ids[tool_name] + + def convert_event_to_a2a_message( - event: Event, invocation_context: InvocationContext, role: Role = Role.agent + event: Event, + invocation_context: InvocationContext, + role: Role = Role.agent, + subagent_session_ids: Optional[Dict[str, str]] = None, ) -> Optional[Message]: """Converts an ADK event to an A2A message. @@ -143,6 +169,9 @@ def convert_event_to_a2a_message( event: The ADK event to convert. invocation_context: The invocation context. role: The role attribute for the message (default: Role.agent). + subagent_session_ids: Optional mapping of tool name to pre-generated + subagent session ID. When provided, function_call DataParts for + matching tools will have the session ID stamped into their metadata. Returns: An A2A Message if the event has content, None otherwise. @@ -165,6 +194,8 @@ def convert_event_to_a2a_message( if a2a_part: a2a_parts.append(a2a_part) _process_long_running_tool(a2a_part, event) + if subagent_session_ids: + _process_subagent_session_id(a2a_part, subagent_session_ids) if a2a_parts: message_metadata = _get_context_metadata(event, invocation_context) @@ -280,6 +311,7 @@ def convert_event_to_a2a_events( invocation_context: InvocationContext, task_id: Optional[str] = None, context_id: Optional[str] = None, + subagent_session_ids: Optional[Dict[str, str]] = None, ) -> List[A2AEvent]: """Converts a GenAI event to a list of A2A events. @@ -288,6 +320,8 @@ def convert_event_to_a2a_events( invocation_context: The invocation context. task_id: Optional task ID to use for generated events. context_id: Optional Context ID to use for generated events. + subagent_session_ids: Optional mapping of tool name to pre-generated + subagent session ID, threaded to ``convert_event_to_a2a_message``. Returns: A list of A2A events representing the converted ADK event. @@ -309,7 +343,7 @@ def convert_event_to_a2a_events( a2a_events.append(error_event) # Handle regular message content - message = convert_event_to_a2a_message(event, invocation_context) + message = convert_event_to_a2a_message(event, invocation_context, subagent_session_ids=subagent_session_ids) if message: running_event = _create_status_update_event(message, invocation_context, event, task_id, context_id) a2a_events.append(running_event) diff --git a/python/packages/kagent-core/src/kagent/core/a2a/_requests.py b/python/packages/kagent-core/src/kagent/core/a2a/_requests.py index 47acb5171..be9abe211 100644 --- a/python/packages/kagent-core/src/kagent/core/a2a/_requests.py +++ b/python/packages/kagent-core/src/kagent/core/a2a/_requests.py @@ -42,10 +42,15 @@ async def build( context: ServerCallContext | None = None, ) -> RequestContext: if context: - # grab the user id from the header headers = context.state.get("headers", {}) + # Extract the authenticated user ID forwarded by the parent agent user_id = headers.get("x-user-id", None) if user_id: context.user = KAgentUser(user_id=user_id) + # Propagate x-kagent-source so downstream code (e.g. session + # creation) can tag this session as subagent-originated. + source = headers.get("x-kagent-source", None) + if source: + context.state["kagent_source"] = source request_context = await super().build(params, task_id, context_id, task, context) return request_context diff --git a/ui/src/app/actions/sessions.ts b/ui/src/app/actions/sessions.ts index b1fa136d5..a25d97525 100644 --- a/ui/src/app/actions/sessions.ts +++ b/ui/src/app/actions/sessions.ts @@ -90,6 +90,38 @@ export async function getSessionTasks(sessionId: string): Promise> { + try { + // fetchApi appends user_id=admin@kagent.dev automatically. + // With the x-user-id passthrough fix, subagent sessions are now + // created under the same user as the parent, so a single fetch works. + const [sessionResp, tasksResp] = await Promise.all([ + fetchApi>(`/sessions/${sessionId}`), + fetchApi>(`/sessions/${sessionId}/tasks`), + ]); + + const session = sessionResp.data?.session; + if (!session) { + return { message: "Subagent session not found" }; + } + return { + message: "Session with events fetched successfully", + data: { session, tasks: tasksResp.data ?? [] }, + }; + } catch (error) { + return createErrorResponse<{ session: Session; tasks: Task[] }>(error, "Error fetching session with events"); + } +} + /** * Check if a session exists * @param sessionId The session ID to check diff --git a/ui/src/components/chat/AgentCallDisplay.tsx b/ui/src/components/chat/AgentCallDisplay.tsx index ee520b5e7..58fcb1400 100644 --- a/ui/src/components/chat/AgentCallDisplay.tsx +++ b/ui/src/components/chat/AgentCallDisplay.tsx @@ -1,10 +1,19 @@ -import { useMemo, useState } from "react"; +import { createContext, useContext, useMemo, useState, useEffect } from "react"; import { FunctionCall, TokenStats } from "@/types"; import { Card, CardHeader, CardTitle, CardContent } from "@/components/ui/card"; import { convertToUserFriendlyName } from "@/lib/utils"; -import { ChevronDown, ChevronUp, MessageSquare, Loader2, AlertCircle, CheckCircle } from "lucide-react"; +import { ChevronDown, ChevronUp, MessageSquare, Loader2, AlertCircle, CheckCircle, Activity } from "lucide-react"; import KagentLogo from "../kagent-logo"; import TokenStatsTooltip from "@/components/chat/TokenStatsTooltip"; +import { getSubagentSessionWithEvents } from "@/app/actions/sessions"; +import { Message, Task } from "@a2a-js/sdk"; +import { extractMessagesFromTasks } from "@/lib/messageHandlers"; +import ChatMessage from "@/components/chat/ChatMessage"; + +// Track and avoid too deep nested agent viewing to avoid UI issues +// In theory this works for infinite depth +const MAX_ACTIVITY_DEPTH = 3; +const ActivityDepthContext = createContext(0); export type AgentCallStatus = "requested" | "executing" | "completed"; @@ -17,14 +26,120 @@ interface AgentCallDisplayProps { status?: AgentCallStatus; isError?: boolean; tokenStats?: TokenStats; + subagentSessionId?: string; +} + +// --------------------------------------------------------------------------- +// SubagentActivityPanel — inline collapsible inside the agent call card +// Polls for new events while the subagent is still running. +// --------------------------------------------------------------------------- +interface SubagentActivityPanelProps { + sessionId: string; + isComplete: boolean; +} + +function SubagentActivityPanel({ sessionId, isComplete }: SubagentActivityPanelProps) { + const activityDepth = useContext(ActivityDepthContext); + const [messages, setMessages] = useState([]); + const [error, setError] = useState(null); + const [waiting, setWaiting] = useState(true); + + console.log(`[SubagentActivityPanel] depth=${activityDepth} sessionId=${sessionId} isComplete=${isComplete}`); + + useEffect(() => { + let cancelled = false; + let timeoutId: ReturnType; + + const fetchEvents = async () => { + try { + const resp = await getSubagentSessionWithEvents(sessionId); + if (cancelled) return; + if (resp.error || !resp.data) { + // Treat 404 / empty responses as "not ready yet" — the subagent + // session may not exist in the DB until it processes the request. + if (!isComplete) { + setWaiting(true); + } else { + setError(resp.error || "Failed to load subagent activity."); + } + } else { + const tasks: Task[] = resp.data.tasks; + const extracted = extractMessagesFromTasks(tasks); + setMessages(extracted); + setWaiting(extracted.length === 0 && !isComplete); + setError(null); + } + } catch { + // Network errors during polling are expected (e.g. session not + // created yet). Only surface as a real error once the subagent + // has completed and we still can't fetch. + if (!cancelled && isComplete) { + setError("Failed to load subagent activity."); + } + } + + // Keep polling while subagent is still running + if (!cancelled && !isComplete) { + timeoutId = setTimeout(fetchEvents, 2000); + } + }; + + fetchEvents(); + return () => { cancelled = true; clearTimeout(timeoutId); }; + }, [sessionId, isComplete]); + + if (error) { + return ( +
+ + {error} +
+ ); + } + + if (waiting && messages.length === 0) { + return ( +
+ + Checking subagent activity... +
+ ); + } + + if (messages.length === 0) { + return ( +

No activity recorded for this session.

+ ); + } + + return ( +
+ {messages.map((msg) => ( + + ))} +
+ ); } -const AgentCallDisplay = ({ call, result, status = "requested", isError = false, tokenStats }: AgentCallDisplayProps) => { +// --------------------------------------------------------------------------- +// AgentCallDisplay +// --------------------------------------------------------------------------- +const AgentCallDisplay = ({ call, result, status = "requested", isError = false, tokenStats, subagentSessionId }: AgentCallDisplayProps) => { const [areInputsExpanded, setAreInputsExpanded] = useState(false); const [areResultsExpanded, setAreResultsExpanded] = useState(false); + const [activityExpanded, setActivityExpanded] = useState(false); + const activityDepth = useContext(ActivityDepthContext); const agentDisplay = useMemo(() => convertToUserFriendlyName(call.name), [call.name]); const hasResult = result !== undefined; + const showActivitySection = !!subagentSessionId && !isError && activityDepth < MAX_ACTIVITY_DEPTH; + + console.log(`[AgentCallDisplay] depth=${activityDepth} agent=${call.name} sessionId=${subagentSessionId ?? "none"} status=${status} showActivity=${showActivitySection}`); const getStatusDisplay = () => { if (isError && status === "executing") { @@ -123,11 +238,29 @@ const AgentCallDisplay = ({ call, result, status = "requested", isError = false, )} + + {showActivitySection && ( +
+ + {activityExpanded && ( + +
+ +
+
+ )} +
+ )} ); }; export default AgentCallDisplay; - - diff --git a/ui/src/components/chat/ToolCallDisplay.tsx b/ui/src/components/chat/ToolCallDisplay.tsx index 97bd07486..c048748b4 100644 --- a/ui/src/components/chat/ToolCallDisplay.tsx +++ b/ui/src/components/chat/ToolCallDisplay.tsx @@ -3,7 +3,7 @@ import { Message, TextPart } from "@a2a-js/sdk"; import ToolDisplay, { ToolCallStatus } from "@/components/ToolDisplay"; import AgentCallDisplay, { AgentCallStatus } from "@/components/chat/AgentCallDisplay"; import { isAgentToolName } from "@/lib/utils"; -import { ADKMetadata, ProcessedToolResultData, ToolResponseData, normalizeToolResultToText, getMetadataValue } from "@/lib/messageHandlers"; +import { ADKMetadata, ProcessedToolCallData, ProcessedToolResultData, ToolResponseData, normalizeToolResultToText, getMetadataValue } from "@/lib/messageHandlers"; import { FunctionCall, ToolDecision, TokenStats } from "@/types"; interface ToolCallDisplayProps { @@ -22,6 +22,7 @@ interface ToolCallState { is_error?: boolean; }; status: ToolCallStatus; + subagentSessionId?: string; } // Helper functions to work with A2A SDK Messages @@ -129,14 +130,23 @@ const extractToolCallResults = (message: Message): ProcessedToolResultData[] => if (part.metadata) { if (getMetadataValue(part.metadata as Record, "type") === "function_response") { const data = part.data as unknown as ToolResponseData; - // Extract normalized content from the result (supports string/object/array) - const textContent = normalizeToolResultToText(data); + + // For agent tool responses we receive { result, subagent_session_id } as FunctionResponse.response. + let textContent = normalizeToolResultToText(data); + let subagentSessionId: string | undefined; + if (isAgentToolName(data.name)) { + const responseObj = data.response as Record | undefined; + if (responseObj && typeof responseObj.subagent_session_id === "string") { + subagentSessionId = responseObj.subagent_session_id; + } + } toolResults.push({ call_id: data.id, name: data.name, content: textContent, is_error: data.response?.isError || false, + ...(subagentSessionId ? { subagent_session_id: subagentSessionId } : {}), }); } } @@ -184,20 +194,20 @@ const ToolCallDisplay = ({ currentMessage, allMessages, onApprove, onReject, pen } const ownedIds = new Set(currentRequests.map(r => r.id).filter(id => id !== undefined) as string[]); - + // Scan backwards from our index to see if any earlier message already has these IDs. // This avoids a full O(N) scan per component render by aborting early. for (let i = currentIndex - 1; i >= 0; i--) { const msg = allMessages[i]; if (!isToolCallRequestMessage(msg)) continue; - + const prevRequests = extractToolCallRequests(msg); for (const pr of prevRequests) { if (pr.id) { ownedIds.delete(pr.id); } } - + if (ownedIds.size === 0) break; // Early exit if all IDs were claimed by earlier messages } return ownedIds; @@ -239,10 +249,15 @@ const ToolCallDisplay = ({ currentMessage, allMessages, onApprove, onReject, pen initialStatus = "pending_approval"; } } + // Extract subagent_session_id from ProcessedToolCallData in metadata + const toolCallData = msgMetadata?.toolCallData as ProcessedToolCallData[] | undefined; + const matchingCallData = toolCallData?.find(tc => tc.id === request.id); + newToolCalls.set(request.id, { id: request.id, call: request, status: initialStatus, + subagentSessionId: matchingCallData?.subagent_session_id, }); } } @@ -263,6 +278,11 @@ const ToolCallDisplay = ({ currentMessage, allMessages, onApprove, onReject, pen content: result.content, is_error: result.is_error }; + if (result.subagent_session_id && !existingCall.subagentSessionId) { + // Only set from function_response if the 1st pass (function_call + // metadata) didn't already provide it. + existingCall.subagentSessionId = result.subagent_session_id; + } if (!isHitlTerminal(existingCall.status)) { existingCall.status = "executing"; } @@ -331,6 +351,7 @@ const ToolCallDisplay = ({ currentMessage, allMessages, onApprove, onReject, pen status={effectiveStatus === "pending_approval" ? "requested" : effectiveStatus as AgentCallStatus} isError={toolCall.result?.is_error} tokenStats={tokenStats} + subagentSessionId={toolCall.subagentSessionId} /> ) : ( ; + subagent_session_id?: string; } export interface ProcessedToolResultData { @@ -476,6 +477,7 @@ export interface ProcessedToolResultData { name: string; content: string; is_error: boolean; + subagent_session_id?: string; } // Normalize various tool response result shapes into plain text @@ -664,7 +666,7 @@ export const createMessageHandlers = (handlers: MessageHandlers) => { contextId: string | undefined, taskId: string | undefined, source: string, - options?: { setProcessingStatus?: boolean; tokenStats?: TokenStats } + options?: { setProcessingStatus?: boolean; tokenStats?: TokenStats; subagentSessionId?: string } ) => { if (options?.setProcessingStatus && handlers.setChatStatus) { handlers.setChatStatus("processing_tools"); @@ -672,7 +674,8 @@ export const createMessageHandlers = (handlers: MessageHandlers) => { const toolCallContent: ProcessedToolCallData[] = [{ id: toolData.id, name: toolData.name, - args: toolData.args || {} + args: toolData.args || {}, + ...(options?.subagentSessionId ? { subagent_session_id: options.subagentSessionId } : {}), }]; const convertedMessage = createMessage( "", @@ -693,11 +696,22 @@ export const createMessageHandlers = (handlers: MessageHandlers) => { taskId: string | undefined, defaultSource: string ) => { + let content = normalizeToolResultToText(toolData); + let subagentSessionId: string | undefined; + + if (isAgentToolName(toolData.name)) { + const responseObj = toolData.response as Record | undefined; + if (responseObj && typeof responseObj.subagent_session_id === "string") { + subagentSessionId = responseObj.subagent_session_id; + } + } + const toolResultContent: ProcessedToolResultData[] = [{ call_id: toolData.id, name: toolData.name, - content: normalizeToolResultToText(toolData), + content, is_error: toolData.response?.isError || false, + ...(subagentSessionId ? { subagent_session_id: subagentSessionId } : {}), }]; const execEvent = createMessage( "", @@ -738,7 +752,7 @@ export const createMessageHandlers = (handlers: MessageHandlers) => { const isUserMessage = (message: Message): boolean => message.role === "user"; // Simple fallback source when metadata is not available - const defaultAgentSource = handlers.agentContext + const defaultAgentSource = handlers.agentContext ? `${handlers.agentContext.namespace}/${handlers.agentContext.agentName.replace(/_/g, "-")}` : "assistant"; @@ -865,10 +879,17 @@ export const createMessageHandlers = (handlers: MessageHandlers) => { } const toolData = data as unknown as ToolCallData; const source = getSourceFromMetadata(adkMetadata, defaultAgentSource); + + // Extract subagent_session_id from DataPart metadata for agent tools + let subagentSessionId: string | undefined; + if (fcName && isAgentToolName(fcName)) { + subagentSessionId = getMetadataValue(partMetadata as Record, "subagent_session_id"); + } + // Don't stamp AgentCall cards with the parent invocation's stats — // those belong on the confirmation dialog. The AgentCall card gets // its own stats from the child agent's function_response. - processFunctionCallPart(toolData, statusUpdate.contextId, statusUpdate.taskId, source, { setProcessingStatus: true, tokenStats: isAgentToolName(toolData.name) ? undefined : turnStats }); + processFunctionCallPart(toolData, statusUpdate.contextId, statusUpdate.taskId, source, { setProcessingStatus: true, tokenStats: isAgentToolName(toolData.name) ? undefined : turnStats, subagentSessionId }); } else if (partType === "function_response") { // Skip internal HITL markers: the before_tool_callback stub and @@ -924,7 +945,17 @@ export const createMessageHandlers = (handlers: MessageHandlers) => { const partType = getMetadataValue(partMetadata as Record, "type"); if (partType === "function_call") { const toolData = data as unknown as ToolCallData; - const toolCallContent: ProcessedToolCallData[] = [{ id: toolData.id, name: toolData.name, args: toolData.args || {} }]; + // Extract subagent_session_id from DataPart metadata for agent tools + let artifactFcSubagentSessionId: string | undefined; + if (isAgentToolName(toolData.name)) { + artifactFcSubagentSessionId = getMetadataValue(partMetadata as Record, "subagent_session_id"); + } + const toolCallContent: ProcessedToolCallData[] = [{ + id: toolData.id, + name: toolData.name, + args: toolData.args || {}, + ...(artifactFcSubagentSessionId ? { subagent_session_id: artifactFcSubagentSessionId } : {}), + }]; const convertedMessage = createMessage("", source, { originalType: "ToolCallRequestEvent", contextId: artifactUpdate.contextId, taskId: artifactUpdate.taskId, additionalMetadata: { toolCallData: toolCallContent, ...(turnStats && { tokenStats: turnStats }) } }); convertedMessages.push(convertedMessage); continue; @@ -933,7 +964,20 @@ export const createMessageHandlers = (handlers: MessageHandlers) => { if (partType === "function_response") { const toolData = data as unknown as ToolResponseData; const textContent = normalizeToolResultToText(toolData); - const toolResultContent: ProcessedToolResultData[] = [{ call_id: toolData.id, name: toolData.name, content: textContent, is_error: toolData.response?.isError || false }]; + let artifactSubagentSessionId: string | undefined; + if (isAgentToolName(toolData.name)) { + const responseObj = toolData.response as Record | undefined; + if (responseObj && typeof responseObj.subagent_session_id === "string") { + artifactSubagentSessionId = responseObj.subagent_session_id; + } + } + const toolResultContent: ProcessedToolResultData[] = [{ + call_id: toolData.id, + name: toolData.name, + content: textContent, + is_error: toolData.response?.isError || false, + ...(artifactSubagentSessionId ? { subagent_session_id: artifactSubagentSessionId } : {}), + }]; const convertedMessage = createMessage("", source, { originalType: "ToolCallExecutionEvent", contextId: artifactUpdate.contextId, taskId: artifactUpdate.taskId, additionalMetadata: { toolResultData: toolResultContent } }); convertedMessages.push(convertedMessage); continue; @@ -979,7 +1023,7 @@ export const createMessageHandlers = (handlers: MessageHandlers) => { if (convertedMessages.length > 0) { handlers.setMessages(prevMessages => [...prevMessages, ...convertedMessages]); } - + // Add a tool call summary message to mark any pending tool calls as completed const summarySource = getSourceFromMetadata(adkMetadata, defaultAgentSource); const toolSummaryMessage = createMessage( @@ -1051,4 +1095,4 @@ export const createMessageHandlers = (handlers: MessageHandlers) => { return { handleMessageEvent }; -}; +}; From ede4a1e2c9d9427c6cc1747262572dee0de662f9 Mon Sep 17 00:00:00 2001 From: Jet Chiang Date: Tue, 17 Mar 2026 01:21:30 -0400 Subject: [PATCH 2/8] subagent fixes Signed-off-by: Jet Chiang You are currently rebasing branch 'jetc/feat/subagent-viewing' on '802f95be'. --- .../kagent-adk/src/kagent/adk/_remote_a2a_tool.py | 4 ++-- ui/src/components/chat/AgentCallDisplay.tsx | 5 ----- ui/src/components/chat/ToolCallDisplay.tsx | 15 ++++++++++++++- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py b/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py index 51c59b70c..9c5b6a910 100644 --- a/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py +++ b/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py @@ -421,8 +421,8 @@ async def _handle_resume(self, tool_context: ToolContext) -> Any: result_text = _extract_text_from_task(task) usage = _extract_usage_from_task(task) if usage: - return {"result": result_text, "kagent_usage_metadata": usage, "subagent_session_id": self._last_context_id} - return {"result": result_text or "", "subagent_session_id": self._last_context_id} + return {"result": result_text, "kagent_usage_metadata": usage, "subagent_session_id": context_id or self._last_context_id} + return {"result": result_text or "", "subagent_session_id": context_id or self._last_context_id} @staticmethod def _extract_text_from_message(message: A2AMessage) -> str: diff --git a/ui/src/components/chat/AgentCallDisplay.tsx b/ui/src/components/chat/AgentCallDisplay.tsx index 58fcb1400..f667e5cde 100644 --- a/ui/src/components/chat/AgentCallDisplay.tsx +++ b/ui/src/components/chat/AgentCallDisplay.tsx @@ -39,13 +39,10 @@ interface SubagentActivityPanelProps { } function SubagentActivityPanel({ sessionId, isComplete }: SubagentActivityPanelProps) { - const activityDepth = useContext(ActivityDepthContext); const [messages, setMessages] = useState([]); const [error, setError] = useState(null); const [waiting, setWaiting] = useState(true); - console.log(`[SubagentActivityPanel] depth=${activityDepth} sessionId=${sessionId} isComplete=${isComplete}`); - useEffect(() => { let cancelled = false; let timeoutId: ReturnType; @@ -139,8 +136,6 @@ const AgentCallDisplay = ({ call, result, status = "requested", isError = false, const hasResult = result !== undefined; const showActivitySection = !!subagentSessionId && !isError && activityDepth < MAX_ACTIVITY_DEPTH; - console.log(`[AgentCallDisplay] depth=${activityDepth} agent=${call.name} sessionId=${subagentSessionId ?? "none"} status=${status} showActivity=${showActivitySection}`); - const getStatusDisplay = () => { if (isError && status === "executing") { return ( diff --git a/ui/src/components/chat/ToolCallDisplay.tsx b/ui/src/components/chat/ToolCallDisplay.tsx index c048748b4..8339ee9c6 100644 --- a/ui/src/components/chat/ToolCallDisplay.tsx +++ b/ui/src/components/chat/ToolCallDisplay.tsx @@ -253,11 +253,24 @@ const ToolCallDisplay = ({ currentMessage, allMessages, onApprove, onReject, pen const toolCallData = msgMetadata?.toolCallData as ProcessedToolCallData[] | undefined; const matchingCallData = toolCallData?.find(tc => tc.id === request.id); + // For agent tools, resolve the subagent session ID. + let subagentSessionId: string | undefined = matchingCallData?.subagent_session_id; + if (!subagentSessionId && isAgentToolName(request.name)) { + const fcDataPart = message.parts?.find(p => + p.kind === "data" && p.metadata && + getMetadataValue(p.metadata as Record, "type") === "function_call" && + (p.data as Record)?.id === request.id + ); + subagentSessionId = fcDataPart?.metadata + ? getMetadataValue(fcDataPart.metadata as Record, "subagent_session_id") + : undefined; + } + newToolCalls.set(request.id, { id: request.id, call: request, status: initialStatus, - subagentSessionId: matchingCallData?.subagent_session_id, + subagentSessionId, }); } } From 014c87108661aa9000fb834dea82b00077d9c2a5 Mon Sep 17 00:00:00 2001 From: Jet Chiang Date: Tue, 17 Mar 2026 01:21:41 -0400 Subject: [PATCH 3/8] docs Signed-off-by: Jet Chiang --- docs/architecture/subagent-viewing.md | 82 +++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 docs/architecture/subagent-viewing.md diff --git a/docs/architecture/subagent-viewing.md b/docs/architecture/subagent-viewing.md new file mode 100644 index 000000000..825b479a6 --- /dev/null +++ b/docs/architecture/subagent-viewing.md @@ -0,0 +1,82 @@ +# Subagent Live Activity Viewing + +Allows users to see what a subagent is doing **during and after** a parent agent's execution, without leaving the parent chat. An "Activity" panel appears inline on each subagent tool call card. + +--- + +## Overview + +When a parent agent delegates to a subagent via `KAgentRemoteA2ATool`, the subagent's session is created in the database under the same user. The UI polls that session for live events and renders them as a nested chat thread inside the parent conversation. + +--- + +## Request Flow + +### 1. Session ID stamping (Python, parent agent) + +`KAgentRemoteA2ATool` pre-generates a `context_id` (UUID) in `__init__` before the tool runs. The event converter stamps this ID as `adk_subagent_session_id` metadata onto the `function_call` DataPart, so the UI knows the subagent session ID as soon as the LLM emits the call — before the tool actually executes. + +### 2. User ID passthrough (Python, A2A interceptor) + +`_SubagentInterceptor` is registered at A2A client construction time. It injects two headers on every outgoing A2A request: + +| Header | Value | Purpose | +|--------|-------|---------| +| `x-user-id` | parent session's `user_id` | Scopes the subagent session to the same DB user | +| `x-kagent-source` | `subagent` | Tags the session as subagent-originated | + +> **Note on A2A SDK:** `A2AClient.add_request_middleware()` appends to `Client._middleware`, which is never read by the transport. Interceptors must be passed to `ClientFactory.create(interceptors=[...])` at construction time to be registered on `JsonRpcTransport.interceptors`. + +### 3. Subagent session creation (Python, subagent runtime) + +The subagent's `KAgentRequestContextBuilder` reads both headers from `context.state["headers"]` (populated automatically by the A2A SDK's `DefaultCallContextBuilder`): + +- `x-user-id` → sets `context.user = KAgentUser(user_id=...)` so the session is stored under the correct user +- `x-kagent-source` → stored in `context.state["kagent_source"]` + +`_prepare_session` reads `kagent_source` directly from the request context. The value is threaded through `state["source"]` → `KAgentSessionService.create_session()` → Go `POST /api/sessions` with `"source": "subagent"` in the body. + +### 4. Session storage (Go) + +The `Session` model has a `Source *string` column (`"subagent"` or `nil`). `ListSessionsForAgent` excludes subagent sessions (`WHERE source IS NULL OR source != 'subagent'`), so they don't appear in the agent's session history sidebar. `GetSession` is unfiltered — the UI can still fetch them by ID. + +### 5. UI polling + +`AgentCallDisplay` shows an "Activity" button when `subagentSessionId` is present. Clicking it opens `SubagentActivityPanel`, which polls `getSubagentSessionWithEvents(sessionId)` every 2 seconds while the subagent is running, stopping once `isComplete`. + +Depth is tracked via `ActivityDepthContext` (max depth: 3) to prevent unbounded nesting. + +--- + +## Data Flow Summary + +``` +Parent LLM emits function_call for subagent tool + → event_converter stamps adk_subagent_session_id on DataPart + → UI extracts sessionId from DataPart metadata + → "Activity" button appears on AgentCallDisplay + +KAgentRemoteA2ATool executes + → _SubagentInterceptor adds x-user-id + x-kagent-source: subagent + → A2A request → subagent pod + +Subagent pod receives request + → KAgentRequestContextBuilder extracts headers + → _prepare_session creates session in DB: + user_id = admin@kagent.dev (from x-user-id) + source = "subagent" (from x-kagent-source) + +UI SubagentActivityPanel polls /api/sessions/{id} + /api/sessions/{id}/tasks + → Renders subagent's messages as nested chat thread + → Stops polling once parent function_response received +``` + +--- + +## Session Visibility + +| Query | Includes subagent sessions? | +|-------|----------------------------| +| `GET /api/sessions/agent/{ns}/{name}` (session history sidebar) | No — filtered by `source != 'subagent'` | +| `GET /api/sessions/{id}` (direct fetch by ID) | Yes — unfiltered | +| `GET /api/sessions/{id}/tasks` | Yes — unfiltered | From c9e1627ee6a20a7f92eb494fa7e8c9c9283d1dac Mon Sep 17 00:00:00 2001 From: Jet Chiang Date: Tue, 17 Mar 2026 01:29:18 -0400 Subject: [PATCH 4/8] cleanup Signed-off-by: Jet Chiang --- .../kagent-adk/src/kagent/adk/_remote_a2a_tool.py | 13 +++---------- ui/src/app/actions/sessions.ts | 4 ---- ui/src/components/chat/AgentCallDisplay.tsx | 7 ------- 3 files changed, 3 insertions(+), 21 deletions(-) diff --git a/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py b/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py index 9c5b6a910..40b041fc8 100644 --- a/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py +++ b/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py @@ -147,10 +147,7 @@ def __init__( self._httpx_client = httpx_client self._a2a_client: Optional[A2AClient] = None self._agent_card: Optional[AgentCard] = None - # Pre-generate context_id so the event converter can stamp it onto - # function_call events before the tool runs. The subagent uses - # context_id as its session_id, so knowing it early lets the UI - # start polling the subagent session during execution. + # Pre-generate context_id for UI session polling self._last_context_id: str = str(uuid.uuid4()) @property @@ -183,11 +180,6 @@ async def _ensure_client(self) -> A2AClient: self.description = self._agent_card.description # Create the A2A client. - # IMPORTANT: pass the interceptor at construction time so it is - # registered on the transport's interceptor list. The SDK's - # add_request_middleware() only appends to Client._middleware which - # is never read by the transport — so interceptors added after - # construction are silently ignored. config = A2AClientConfig( httpx_client=self._httpx_client, streaming=False, @@ -422,7 +414,8 @@ async def _handle_resume(self, tool_context: ToolContext) -> Any: usage = _extract_usage_from_task(task) if usage: return {"result": result_text, "kagent_usage_metadata": usage, "subagent_session_id": context_id or self._last_context_id} - return {"result": result_text or "", "subagent_session_id": context_id or self._last_context_id} + # context_id from the confirmation payload is the original subagent session ID in case of interrupts + return {"result": result_text, "subagent_session_id": context_id or self._last_context_id} @staticmethod def _extract_text_from_message(message: A2AMessage) -> str: diff --git a/ui/src/app/actions/sessions.ts b/ui/src/app/actions/sessions.ts index a25d97525..66262343c 100644 --- a/ui/src/app/actions/sessions.ts +++ b/ui/src/app/actions/sessions.ts @@ -92,8 +92,6 @@ export async function getSessionTasks(sessionId: string): Promise> { try { // fetchApi appends user_id=admin@kagent.dev automatically. - // With the x-user-id passthrough fix, subagent sessions are now - // created under the same user as the parent, so a single fetch works. const [sessionResp, tasksResp] = await Promise.all([ fetchApi>(`/sessions/${sessionId}`), fetchApi>(`/sessions/${sessionId}/tasks`), diff --git a/ui/src/components/chat/AgentCallDisplay.tsx b/ui/src/components/chat/AgentCallDisplay.tsx index f667e5cde..0fb5e7d3f 100644 --- a/ui/src/components/chat/AgentCallDisplay.tsx +++ b/ui/src/components/chat/AgentCallDisplay.tsx @@ -29,10 +29,6 @@ interface AgentCallDisplayProps { subagentSessionId?: string; } -// --------------------------------------------------------------------------- -// SubagentActivityPanel — inline collapsible inside the agent call card -// Polls for new events while the subagent is still running. -// --------------------------------------------------------------------------- interface SubagentActivityPanelProps { sessionId: string; isComplete: boolean; @@ -123,9 +119,6 @@ function SubagentActivityPanel({ sessionId, isComplete }: SubagentActivityPanelP ); } -// --------------------------------------------------------------------------- -// AgentCallDisplay -// --------------------------------------------------------------------------- const AgentCallDisplay = ({ call, result, status = "requested", isError = false, tokenStats, subagentSessionId }: AgentCallDisplayProps) => { const [areInputsExpanded, setAreInputsExpanded] = useState(false); const [areResultsExpanded, setAreResultsExpanded] = useState(false); From 59d684d8ecb59b1b8414cab042d94b902853badb Mon Sep 17 00:00:00 2001 From: Jet Chiang Date: Tue, 17 Mar 2026 12:05:52 -0400 Subject: [PATCH 5/8] fix tests Signed-off-by: Jet Chiang --- .../tests/unittests/test_remote_a2a_tool.py | 526 +++++++++--------- ui/src/components/chat/ToolCallDisplay.tsx | 2 +- ui/src/lib/messageHandlers.ts | 2 +- 3 files changed, 255 insertions(+), 275 deletions(-) diff --git a/python/packages/kagent-adk/tests/unittests/test_remote_a2a_tool.py b/python/packages/kagent-adk/tests/unittests/test_remote_a2a_tool.py index 578828665..dd1abb5d2 100644 --- a/python/packages/kagent-adk/tests/unittests/test_remote_a2a_tool.py +++ b/python/packages/kagent-adk/tests/unittests/test_remote_a2a_tool.py @@ -16,7 +16,11 @@ from a2a.types import Part as A2APart from google.adk.tools.tool_confirmation import ToolConfirmation -from kagent.adk._remote_a2a_tool import KAgentRemoteA2ATool, KAgentRemoteA2AToolset +from kagent.adk._remote_a2a_tool import ( + KAgentRemoteA2ATool, + KAgentRemoteA2AToolset, + SubagentSessionProvider, +) from kagent.core.a2a import ( KAGENT_HITL_DECISION_TYPE_APPROVE, KAGENT_HITL_DECISION_TYPE_BATCH, @@ -24,25 +28,48 @@ KAGENT_HITL_DECISION_TYPE_REJECT, ) +# --------------------------------------------------------------------------- +# Test helpers +# --------------------------------------------------------------------------- -def _make_task(state: TaskState, text: str = "", hitl_data: list[dict] | None = None) -> Task: - """Build a minimal Task with the given state and optional text result. +_DEFAULT_USER_ID = "admin@kagent.dev" + + +class _MockSession: + """Minimal session mock providing user_id.""" + + def __init__(self, user_id: str = _DEFAULT_USER_ID): + self.user_id = user_id + + +class MockToolContext: + """Minimal ToolContext mock matching the interface used by KAgentRemoteA2ATool.""" + + def __init__( + self, + tool_confirmation: ToolConfirmation | None = None, + user_id: str = _DEFAULT_USER_ID, + ): + self.state: dict[str, Any] = {} + self.function_call_id = "outer_fc_1" + self.tool_confirmation = tool_confirmation + self.session = _MockSession(user_id) + self._confirmations: dict[str, ToolConfirmation] = {} + + def request_confirmation(self, *, hint: str = "", payload: dict | None = None) -> None: + self._confirmations[self.function_call_id] = ToolConfirmation(hint=hint, payload=payload) - Text is placed in the status message (the fallback path in _extract_text_from_task) - so tests don't need to construct Artifact objects with their required artifactId field. - """ - parts: list[A2APart] = [] +def _make_task(state: TaskState, text: str = "", hitl_data: list[dict] | None = None) -> Task: + """Build a minimal Task with the given state and optional text/HITL data.""" + parts: list[A2APart] = [] if hitl_data: for d in hitl_data: parts.append( A2APart( root=DataPart( data=d, - metadata={ - "adk_type": "function_call", - "adk_is_long_running": True, - }, + metadata={"adk_type": "function_call", "adk_is_long_running": True}, ) ) ) @@ -50,7 +77,6 @@ def _make_task(state: TaskState, text: str = "", hitl_data: list[dict] | None = parts.append(A2APart(root=TextPart(text=text))) status_message = A2AMessage(role=Role.agent, message_id="msg-1", parts=parts) if parts else None - return Task( id="task-1", context_id="ctx-1", @@ -82,19 +108,6 @@ async def _async_yield(*items) -> AsyncIterator: yield item -class MockToolContext: - """Minimal ToolContext mock matching the interface used by KAgentRemoteA2ATool.""" - - def __init__(self, tool_confirmation: ToolConfirmation | None = None): - self.state: dict[str, Any] = {} - self.function_call_id = "outer_fc_1" - self.tool_confirmation = tool_confirmation - self._confirmations: dict[str, ToolConfirmation] = {} - - def request_confirmation(self, *, hint: str = "", payload: dict | None = None) -> None: - self._confirmations[self.function_call_id] = ToolConfirmation(hint=hint, payload=payload) - - def _make_tool(*, httpx_client: httpx.AsyncClient | None = None) -> KAgentRemoteA2ATool: return KAgentRemoteA2ATool( name="k8s_agent", @@ -104,328 +117,300 @@ def _make_tool(*, httpx_client: httpx.AsyncClient | None = None) -> KAgentRemote ) -class TestDirectAgentCall: - """ - Tests for direct agent call. - This was the original behaviour of the AgentTool(RemoteA2aAgent(...)) pairing. +def _patch_client(tool: KAgentRemoteA2ATool, send_side_effect): + """Patch _ensure_client on *tool* so send_message uses *send_side_effect*. + + *send_side_effect* is either a callable (async generator function) + or an async-iterable return value. """ + p = patch.object(tool, "_ensure_client") + mock_ensure = p.start() + mock_client = MagicMock() + if callable(send_side_effect) and not isinstance(send_side_effect, MagicMock): + mock_client.send_message = send_side_effect + else: + mock_client.send_message = MagicMock(return_value=send_side_effect) + mock_ensure.return_value = mock_client + return p, mock_client + + +def _approval_ctx(confirmed: bool, payload: dict | None = None, **kwargs) -> MockToolContext: + confirmation = ToolConfirmation(confirmed=confirmed, payload=payload or {}) + return MockToolContext(tool_confirmation=confirmation, **kwargs) + + +# --------------------------------------------------------------------------- +# First-call tests +# --------------------------------------------------------------------------- - async def test_returns_artifact_text_on_completion(self): + +class TestFirstCall: + """Tests for the initial tool invocation (Phase 1).""" + + async def test_completed_task_returns_result_with_session_id(self): + """Completed task returns dict with result text and subagent_session_id.""" tool = _make_tool() task = _make_task(TaskState.completed, text="all done") + p, _ = _patch_client(tool, _async_yield((task, None))) + try: + result = await tool.run_async(args={"request": "do something"}, tool_context=MockToolContext()) + finally: + p.stop() - with patch.object(tool, "_ensure_client") as mock_ensure: - mock_client = MagicMock() - mock_client.send_message = MagicMock(return_value=_async_yield((task, None))) - mock_ensure.return_value = mock_client + assert isinstance(result, dict) + assert result["result"] == "all done" + assert result["subagent_session_id"] == tool._last_context_id - ctx = MockToolContext() - result = await tool.run_async(args={"request": "do something"}, tool_context=ctx) + async def test_direct_message_response_returns_text(self): + """When remote agent returns an A2AMessage directly, result is plain text.""" + tool = _make_tool() + msg = A2AMessage( + role=Role.agent, + message_id="m1", + parts=[A2APart(root=TextPart(text="direct reply"))], + ) + p, _ = _patch_client(tool, _async_yield(msg)) + try: + result = await tool.run_async(args={"request": "hi"}, tool_context=MockToolContext()) + finally: + p.stop() - assert result == "all done" + assert result == "direct reply" - async def test_stores_context_id_after_completion(self): + async def test_no_result_returns_fallback_string(self): + """When remote agent yields nothing, a fallback error string is returned.""" tool = _make_tool() - task = Task( - id="task-1", - context_id="ctx-abc", - status=TaskStatus( - state=TaskState.completed, - message=A2AMessage( - role=Role.agent, - message_id="msg-1", - parts=[A2APart(root=TextPart(text="ok"))], - ), - ), - ) + p, _ = _patch_client(tool, _async_yield()) + try: + result = await tool.run_async(args={"request": "hi"}, tool_context=MockToolContext()) + finally: + p.stop() - with patch.object(tool, "_ensure_client") as mock_ensure: - mock_client = MagicMock() - mock_client.send_message = MagicMock(return_value=_async_yield((task, None))) - mock_ensure.return_value = mock_client + assert "no result" in result.lower() - ctx = MockToolContext() - await tool.run_async(args={"request": "go"}, tool_context=ctx) + async def test_failed_task_returns_error_text(self): + """Failed tasks return the error text from the task status message.""" + tool = _make_tool() + task = _make_task(TaskState.failed, text="something broke") + p, _ = _patch_client(tool, _async_yield((task, None))) + try: + result = await tool.run_async(args={"request": "go"}, tool_context=MockToolContext()) + finally: + p.stop() - assert tool._last_context_id == "ctx-abc" + assert result == "something broke" - async def test_passes_stored_context_id_on_subsequent_call(self): + async def test_context_id_sent_in_outgoing_message(self): + """The tool's pre-generated context_id is sent on the outgoing A2A message.""" tool = _make_tool() - tool._last_context_id = "prev-ctx" task = _make_task(TaskState.completed, text="ok") + sent: list[A2AMessage] = [] - sent_messages: list[A2AMessage] = [] - - async def capturing_send(*, request: A2AMessage): - sent_messages.append(request) + async def capture(*, request, **kw): + sent.append(request) yield (task, None) - with patch.object(tool, "_ensure_client") as mock_ensure: - mock_client = MagicMock() - mock_client.send_message = capturing_send - mock_ensure.return_value = mock_client - - ctx = MockToolContext() - await tool.run_async(args={"request": "hello"}, tool_context=ctx) + p, _ = _patch_client(tool, capture) + try: + await tool.run_async(args={"request": "hello"}, tool_context=MockToolContext()) + finally: + p.stop() - assert sent_messages[0].context_id == "prev-ctx" + assert sent[0].context_id == tool._last_context_id - async def test_direct_message_response_returns_text(self): + async def test_user_id_forwarded_in_call_context(self): + """The parent session's user_id is forwarded via ClientCallContext.""" tool = _make_tool() - msg = A2AMessage( - role=Role.agent, - message_id="m1", - context_id="ctx-direct", - parts=[A2APart(root=TextPart(text="direct reply"))], - ) - - with patch.object(tool, "_ensure_client") as mock_ensure: - mock_client = MagicMock() - mock_client.send_message = MagicMock(return_value=_async_yield(msg)) - mock_ensure.return_value = mock_client - - ctx = MockToolContext() - result = await tool.run_async(args={"request": "hi"}, tool_context=ctx) + task = _make_task(TaskState.completed, text="ok") + captured_contexts: list = [] - assert result == "direct reply" - assert tool._last_context_id == "ctx-direct" + async def capture(*, request, context=None, **kw): + captured_contexts.append(context) + yield (task, None) - async def test_no_result_returns_fallback_string(self): - tool = _make_tool() + p, _ = _patch_client(tool, capture) + try: + ctx = MockToolContext(user_id="alice@example.com") + await tool.run_async(args={"request": "go"}, tool_context=ctx) + finally: + p.stop() - with patch.object(tool, "_ensure_client") as mock_ensure: - mock_client = MagicMock() - mock_client.send_message = MagicMock(return_value=_async_yield()) - mock_ensure.return_value = mock_client + assert captured_contexts[0].state["x-user-id"] == "alice@example.com" - ctx = MockToolContext() - result = await tool.run_async(args={"request": "hi"}, tool_context=ctx) - assert "no result" in result.lower() +# --------------------------------------------------------------------------- +# HITL input_required tests +# --------------------------------------------------------------------------- class TestHITLInputRequired: - async def test_calls_request_confirmation_on_input_required(self): - tool = _make_tool() - task = _make_hitl_task(tool_name="delete_file", tool_call_id="call_1") - - with patch.object(tool, "_ensure_client") as mock_ensure: - mock_client = MagicMock() - mock_client.send_message = MagicMock(return_value=_async_yield((task, None))) - mock_ensure.return_value = mock_client + """Tests for when the subagent returns input_required.""" + async def test_calls_request_confirmation(self): + """request_confirmation is called with a hint naming the inner tool.""" + tool = _make_tool() + task = _make_hitl_task(tool_name="delete_file") + p, _ = _patch_client(tool, _async_yield((task, None))) + try: ctx = MockToolContext() - _ = await tool.run_async(args={"request": "delete it"}, tool_context=ctx) + await tool.run_async(args={"request": "delete it"}, tool_context=ctx) + finally: + p.stop() - # request_confirmation() should have been invoked assert ctx.function_call_id in ctx._confirmations conf = ctx._confirmations[ctx.function_call_id] assert "delete_file" in conf.hint - async def test_confirmation_payload_contains_task_and_context_id(self): + async def test_confirmation_payload(self): + """Payload contains task_id, context_id, subagent_name, and hitl_parts.""" tool = _make_tool() - task = _make_hitl_task() - - with patch.object(tool, "_ensure_client") as mock_ensure: - mock_client = MagicMock() - mock_client.send_message = MagicMock(return_value=_async_yield((task, None))) - mock_ensure.return_value = mock_client - + task = _make_hitl_task(tool_name="write_file", tool_call_id="c99") + p, _ = _patch_client(tool, _async_yield((task, None))) + try: ctx = MockToolContext() await tool.run_async(args={"request": "go"}, tool_context=ctx) + finally: + p.stop() payload = ctx._confirmations[ctx.function_call_id].payload assert payload["task_id"] == "task-1" assert payload["context_id"] == "ctx-1" assert payload["subagent_name"] == "k8s_agent" - - async def test_confirmation_payload_contains_hitl_parts(self): - tool = _make_tool() - task = _make_hitl_task(tool_name="write_file", tool_call_id="c99") - - with patch.object(tool, "_ensure_client") as mock_ensure: - mock_client = MagicMock() - mock_client.send_message = MagicMock(return_value=_async_yield((task, None))) - mock_ensure.return_value = mock_client - - ctx = MockToolContext() - await tool.run_async(args={"request": "go"}, tool_context=ctx) - - payload = ctx._confirmations[ctx.function_call_id].payload + # hitl_parts should contain the serialized HITL info hitl_parts = payload["hitl_parts"] - assert hitl_parts is not None assert len(hitl_parts) == 1 assert hitl_parts[0]["originalFunctionCall"]["name"] == "write_file" assert hitl_parts[0]["originalFunctionCall"]["id"] == "c99" -def _approval_ctx(confirmed: bool, payload: dict | None = None) -> MockToolContext: - confirmation = ToolConfirmation(confirmed=confirmed, payload=payload or {}) - return MockToolContext(tool_confirmation=confirmation) +# --------------------------------------------------------------------------- +# HITL resume tests (Phase 2) +# --------------------------------------------------------------------------- +_RESUME_PAYLOAD = {"task_id": "task-1", "context_id": "ctx-1", "subagent_name": "k8s_agent"} -class TestHITLUniformDecisions: - async def test_approve_sends_approve_decision(self): - tool = _make_tool() - final_task = _make_task(TaskState.completed, text="done after approve") - sent_messages: list[A2AMessage] = [] +class TestHITLResume: + """Tests for resume after HITL confirmation (Phase 2).""" - async def capturing_send(*, request: A2AMessage): - sent_messages.append(request) - yield (final_task, None) + async def _resume( + self, + tool: KAgentRemoteA2ATool, + confirmed: bool, + payload: dict, + response_task: Task | None = None, + ) -> tuple[Any, list[A2AMessage]]: + """Run a resume and return (result, sent_messages).""" + if response_task is None: + response_task = _make_task(TaskState.completed, text="ok") + sent: list[A2AMessage] = [] - with patch.object(tool, "_ensure_client") as mock_ensure: - mock_client = MagicMock() - mock_client.send_message = capturing_send - mock_ensure.return_value = mock_client + async def capture(*, request, **kw): + sent.append(request) + yield (response_task, None) - payload = {"task_id": "task-1", "context_id": "ctx-1", "subagent_name": "k8s_agent"} - ctx = _approval_ctx(confirmed=True, payload=payload) + p, _ = _patch_client(tool, capture) + try: + ctx = _approval_ctx(confirmed=confirmed, payload=payload) result = await tool.run_async(args={}, tool_context=ctx) + finally: + p.stop() + return result, sent - assert result == "done after approve" - assert len(sent_messages) == 1 - msg = sent_messages[0] - data = msg.parts[0].root.data + async def test_approve_sends_approve_decision(self): + tool = _make_tool() + result, sent = await self._resume( + tool, + confirmed=True, + payload=_RESUME_PAYLOAD, + response_task=_make_task(TaskState.completed, text="approved"), + ) + assert result["result"] == "approved" + data = sent[0].parts[0].root.data assert data[KAGENT_HITL_DECISION_TYPE_KEY] == KAGENT_HITL_DECISION_TYPE_APPROVE + # Verify task_id and context_id are routed correctly + assert sent[0].task_id == "task-1" + assert sent[0].context_id == "ctx-1" async def test_reject_sends_reject_decision(self): tool = _make_tool() - final_task = _make_task(TaskState.completed, text="done after reject") - - sent_messages: list[A2AMessage] = [] - - async def capturing_send(*, request: A2AMessage): - sent_messages.append(request) - yield (final_task, None) - - with patch.object(tool, "_ensure_client") as mock_ensure: - mock_client = MagicMock() - mock_client.send_message = capturing_send - mock_ensure.return_value = mock_client - - payload = {"task_id": "task-1", "context_id": "ctx-1", "subagent_name": "k8s_agent"} - ctx = _approval_ctx(confirmed=False, payload=payload) - result = await tool.run_async(args={}, tool_context=ctx) - - assert result == "done after reject" - data = sent_messages[0].parts[0].root.data + _, sent = await self._resume(tool, confirmed=False, payload=_RESUME_PAYLOAD) + data = sent[0].parts[0].root.data assert data[KAGENT_HITL_DECISION_TYPE_KEY] == KAGENT_HITL_DECISION_TYPE_REJECT - async def test_reject_with_reason_forwards_reason(self): - tool = _make_tool() - final_task = _make_task(TaskState.completed, text="ok") - - sent_messages: list[A2AMessage] = [] - - async def capturing_send(*, request: A2AMessage): - sent_messages.append(request) - yield (final_task, None) - - with patch.object(tool, "_ensure_client") as mock_ensure: - mock_client = MagicMock() - mock_client.send_message = capturing_send - mock_ensure.return_value = mock_client - - payload = { - "task_id": "task-1", - "context_id": "ctx-1", - "subagent_name": "k8s_agent", - "rejection_reason": "Too risky", - } - ctx = _approval_ctx(confirmed=False, payload=payload) - await tool.run_async(args={}, tool_context=ctx) - - data = sent_messages[0].parts[0].root.data - assert data.get("rejection_reason") == "Too risky" - - async def test_resume_routes_to_correct_task_and_context(self): + async def test_reject_with_reason(self): tool = _make_tool() - final_task = _make_task(TaskState.completed, text="ok") + payload = {**_RESUME_PAYLOAD, "rejection_reason": "Too risky"} + _, sent = await self._resume(tool, confirmed=False, payload=payload) + data = sent[0].parts[0].root.data + assert data["rejection_reason"] == "Too risky" - sent_messages: list[A2AMessage] = [] - - async def capturing_send(*, request: A2AMessage): - sent_messages.append(request) - yield (final_task, None) - - with patch.object(tool, "_ensure_client") as mock_ensure: - mock_client = MagicMock() - mock_client.send_message = capturing_send - mock_ensure.return_value = mock_client - - payload = {"task_id": "task-99", "context_id": "ctx-99", "subagent_name": "k8s_agent"} - ctx = _approval_ctx(confirmed=True, payload=payload) - await tool.run_async(args={}, tool_context=ctx) - - msg = sent_messages[0] - assert msg.task_id == "task-99" - assert msg.context_id == "ctx-99" - - -class TestHITLBatchDecisions: async def test_batch_decisions_forwarded(self): tool = _make_tool() - final_task = _make_task(TaskState.completed, text="batch done") - - sent_messages: list[A2AMessage] = [] - - async def capturing_send(*, request: A2AMessage): - sent_messages.append(request) - yield (final_task, None) - - with patch.object(tool, "_ensure_client") as mock_ensure: - mock_client = MagicMock() - mock_client.send_message = capturing_send - mock_ensure.return_value = mock_client - - payload = { - "task_id": "task-1", - "context_id": "ctx-1", - "subagent_name": "k8s_agent", - "batch_decisions": {"call_1": "approve", "call_2": "reject"}, - } - ctx = _approval_ctx(confirmed=True, payload=payload) - result = await tool.run_async(args={}, tool_context=ctx) - - assert result == "batch done" - data = sent_messages[0].parts[0].root.data + payload = { + **_RESUME_PAYLOAD, + "batch_decisions": {"call_1": "approve", "call_2": "reject"}, + } + result, sent = await self._resume(tool, confirmed=True, payload=payload) + data = sent[0].parts[0].root.data assert data[KAGENT_HITL_DECISION_TYPE_KEY] == KAGENT_HITL_DECISION_TYPE_BATCH assert data["decisions"] == {"call_1": "approve", "call_2": "reject"} - async def test_batch_with_rejection_reasons_forwarded(self): + async def test_batch_with_rejection_reasons(self): + tool = _make_tool() + payload = { + **_RESUME_PAYLOAD, + "batch_decisions": {"call_1": "approve", "call_2": "reject"}, + "rejection_reasons": {"call_2": "Too dangerous"}, + } + _, sent = await self._resume(tool, confirmed=True, payload=payload) + data = sent[0].parts[0].root.data + assert data["rejection_reasons"] == {"call_2": "Too dangerous"} + + async def test_ask_user_answers_forwarded(self): + """ask_user answers are forwarded as approve with ask_user_answers payload.""" tool = _make_tool() - final_task = _make_task(TaskState.completed, text="ok") + payload = {**_RESUME_PAYLOAD, "answers": ["yes", "42"]} + _, sent = await self._resume(tool, confirmed=True, payload=payload) + data = sent[0].parts[0].root.data + assert data[KAGENT_HITL_DECISION_TYPE_KEY] == KAGENT_HITL_DECISION_TYPE_APPROVE + assert data["ask_user_answers"] == ["yes", "42"] - sent_messages: list[A2AMessage] = [] + async def test_missing_task_id_returns_error(self): + """Resume without task_id in payload returns an error string.""" + tool = _make_tool() + ctx = _approval_ctx(confirmed=True, payload={"context_id": "ctx-1"}) + result = await tool.run_async(args={}, tool_context=ctx) + assert "missing task context" in result.lower() - async def capturing_send(*, request: A2AMessage): - sent_messages.append(request) - yield (final_task, None) + async def test_resume_returns_subagent_session_id(self): + """Resume result includes the subagent_session_id from the confirmation payload.""" + tool = _make_tool() + result, _ = await self._resume(tool, confirmed=True, payload=_RESUME_PAYLOAD) + assert result["subagent_session_id"] == "ctx-1" - with patch.object(tool, "_ensure_client") as mock_ensure: - mock_client = MagicMock() - mock_client.send_message = capturing_send - mock_ensure.return_value = mock_client + async def test_resume_input_required_chains(self): + """If the subagent returns input_required again after resume, it chains.""" + tool = _make_tool() + chained_task = _make_hitl_task(tool_name="restart_pod") + p, _ = _patch_client(tool, _async_yield((chained_task, None))) + try: + ctx = _approval_ctx(confirmed=True, payload=_RESUME_PAYLOAD) + result = await tool.run_async(args={}, tool_context=ctx) + finally: + p.stop() - payload = { - "task_id": "task-1", - "context_id": "ctx-1", - "subagent_name": "k8s_agent", - "batch_decisions": {"call_1": "approve", "call_2": "reject"}, - "rejection_reasons": {"call_2": "Too dangerous"}, - } - ctx = _approval_ctx(confirmed=True, payload=payload) - await tool.run_async(args={}, tool_context=ctx) + assert result["waiting_for"] == "subagent_approval" + assert ctx.function_call_id in ctx._confirmations + assert "restart_pod" in ctx._confirmations[ctx.function_call_id].hint - data = sent_messages[0].parts[0].root.data - assert data.get("rejection_reasons") == {"call_2": "Too dangerous"} +# --------------------------------------------------------------------------- +# Toolset lifecycle tests +# --------------------------------------------------------------------------- -class TestToolsetCloseLifecycle: - """Tests for the toolset close lifecycle.""" +class TestToolsetLifecycle: async def test_close_closes_owned_client(self): mock_client = AsyncMock(spec=httpx.AsyncClient) toolset = KAgentRemoteA2AToolset( @@ -434,9 +419,7 @@ async def test_close_closes_owned_client(self): agent_card_url="http://agent/.well-known/agent.json", httpx_client=mock_client, ) - await toolset.close() - mock_client.aclose.assert_awaited_once() assert toolset._httpx_client is None @@ -448,10 +431,8 @@ async def test_close_is_idempotent(self): agent_card_url="http://agent/.well-known/agent.json", httpx_client=mock_client, ) - await toolset.close() - await toolset.close() # second call must not raise - + await toolset.close() mock_client.aclose.assert_awaited_once() async def test_get_tools_returns_the_tool(self): @@ -462,7 +443,6 @@ async def test_get_tools_returns_the_tool(self): agent_card_url="http://agent/.well-known/agent.json", httpx_client=mock_client, ) - tools = await toolset.get_tools() assert len(tools) == 1 assert isinstance(tools[0], KAgentRemoteA2ATool) diff --git a/ui/src/components/chat/ToolCallDisplay.tsx b/ui/src/components/chat/ToolCallDisplay.tsx index 8339ee9c6..9ebaec3e7 100644 --- a/ui/src/components/chat/ToolCallDisplay.tsx +++ b/ui/src/components/chat/ToolCallDisplay.tsx @@ -132,7 +132,7 @@ const extractToolCallResults = (message: Message): ProcessedToolResultData[] => const data = part.data as unknown as ToolResponseData; // For agent tool responses we receive { result, subagent_session_id } as FunctionResponse.response. - let textContent = normalizeToolResultToText(data); + const textContent = normalizeToolResultToText(data); let subagentSessionId: string | undefined; if (isAgentToolName(data.name)) { const responseObj = data.response as Record | undefined; diff --git a/ui/src/lib/messageHandlers.ts b/ui/src/lib/messageHandlers.ts index bd610d8cb..e5fa808ff 100644 --- a/ui/src/lib/messageHandlers.ts +++ b/ui/src/lib/messageHandlers.ts @@ -696,7 +696,7 @@ export const createMessageHandlers = (handlers: MessageHandlers) => { taskId: string | undefined, defaultSource: string ) => { - let content = normalizeToolResultToText(toolData); + const content = normalizeToolResultToText(toolData); let subagentSessionId: string | undefined; if (isAgentToolName(toolData.name)) { From f9636237d4fea3443be6b1dd4d6ae1bcf42b0f4a Mon Sep 17 00:00:00 2001 From: Jet Chiang Date: Tue, 17 Mar 2026 13:03:10 -0400 Subject: [PATCH 6/8] fix comments and rebase Signed-off-by: Jet Chiang --- docs/architecture/subagent-viewing.md | 2 +- ui/src/app/actions/sessions.ts | 2 +- ui/src/components/chat/AgentCallDisplay.tsx | 9 +- ui/src/lib/__tests__/messageHandlers.test.ts | 129 +++++++++++++++++++ ui/src/lib/messageHandlers.ts | 18 ++- 5 files changed, 155 insertions(+), 5 deletions(-) diff --git a/docs/architecture/subagent-viewing.md b/docs/architecture/subagent-viewing.md index 825b479a6..d98411e29 100644 --- a/docs/architecture/subagent-viewing.md +++ b/docs/architecture/subagent-viewing.md @@ -14,7 +14,7 @@ When a parent agent delegates to a subagent via `KAgentRemoteA2ATool`, the subag ### 1. Session ID stamping (Python, parent agent) -`KAgentRemoteA2ATool` pre-generates a `context_id` (UUID) in `__init__` before the tool runs. The event converter stamps this ID as `adk_subagent_session_id` metadata onto the `function_call` DataPart, so the UI knows the subagent session ID as soon as the LLM emits the call — before the tool actually executes. +`KAgentRemoteA2ATool` pre-generates a `context_id` (UUID) in `__init__` before the tool runs. The event converter stamps this ID as `kagent_subagent_session_id` metadata onto the `function_call` DataPart, so the UI knows the subagent session ID as soon as the LLM emits the call — before the tool actually executes. ### 2. User ID passthrough (Python, A2A interceptor) diff --git a/ui/src/app/actions/sessions.ts b/ui/src/app/actions/sessions.ts index 66262343c..3f987076f 100644 --- a/ui/src/app/actions/sessions.ts +++ b/ui/src/app/actions/sessions.ts @@ -107,7 +107,7 @@ export async function getSubagentSessionWithEvents( const session = sessionResp.data?.session; if (!session) { - return { message: "Subagent session not found" }; + return { message: "Subagent session not found", error: "Subagent session not found" }; } return { message: "Session with events fetched successfully", diff --git a/ui/src/components/chat/AgentCallDisplay.tsx b/ui/src/components/chat/AgentCallDisplay.tsx index 0fb5e7d3f..0c8dcd1be 100644 --- a/ui/src/components/chat/AgentCallDisplay.tsx +++ b/ui/src/components/chat/AgentCallDisplay.tsx @@ -41,7 +41,7 @@ function SubagentActivityPanel({ sessionId, isComplete }: SubagentActivityPanelP useEffect(() => { let cancelled = false; - let timeoutId: ReturnType; + let timeoutId: ReturnType | undefined; const fetchEvents = async () => { try { @@ -78,7 +78,12 @@ function SubagentActivityPanel({ sessionId, isComplete }: SubagentActivityPanelP }; fetchEvents(); - return () => { cancelled = true; clearTimeout(timeoutId); }; + return () => { + cancelled = true; + if (timeoutId) { + clearTimeout(timeoutId); + } + }; }, [sessionId, isComplete]); if (error) { diff --git a/ui/src/lib/__tests__/messageHandlers.test.ts b/ui/src/lib/__tests__/messageHandlers.test.ts index a69d69ece..de95321ab 100644 --- a/ui/src/lib/__tests__/messageHandlers.test.ts +++ b/ui/src/lib/__tests__/messageHandlers.test.ts @@ -354,6 +354,135 @@ describe('createMessageHandlers test', () => { }); }); +describe('subagent_session_id propagation', () => { + // Shared handler factory for status-update / artifact-update tests + function makeHandlers() { + const emitted: Message[] = []; + const handlers = createMessageHandlers({ + setMessages: (updater) => { + const next = updater(emitted); + emitted.length = 0; + emitted.push(...next); + }, + setIsStreaming: () => {}, + setStreamingContent: () => {}, + setChatStatus: () => {}, + agentContext: { namespace: 'kagent', agentName: 'testagent' }, + }); + return { emitted, handlers }; + } + + test('status-update: agent function_call with kagent_subagent_session_id in DataPart metadata emits toolCallData with subagent_session_id', () => { + const { emitted, handlers } = makeHandlers(); + + const statusUpdateCall: any = { + kind: 'status-update', contextId: 'ctx', taskId: 'task', final: false, + status: { + state: 'working', + message: { + role: 'agent', + parts: [{ + kind: 'data', + data: { id: 'agent_call_1', name: 'kagent__NS__k8s_agent', args: { request: 'list pods' } }, + metadata: { kagent_type: 'function_call', kagent_subagent_session_id: 'sess-abc-123' }, + }], + }, + }, + }; + handlers.handleMessageEvent(statusUpdateCall); + + expect(emitted.length).toBe(1); + const meta = emitted[0].metadata as ADKMetadata; + expect(meta.originalType).toBe('ToolCallRequestEvent'); + expect(meta.toolCallData).toHaveLength(1); + expect(meta.toolCallData![0].subagent_session_id).toBe('sess-abc-123'); + }); + + test('status-update: agent function_response with subagent_session_id in response dict emits toolResultData with subagent_session_id', () => { + const { emitted, handlers } = makeHandlers(); + + const statusUpdateResp: any = { + kind: 'status-update', contextId: 'ctx', taskId: 'task', final: false, + status: { + state: 'working', + message: { + role: 'agent', + parts: [{ + kind: 'data', + data: { + id: 'agent_call_1', + name: 'kagent__NS__k8s_agent', + response: { result: 'done', subagent_session_id: 'sess-abc-123' }, + }, + metadata: { kagent_type: 'function_response' }, + }], + }, + }, + }; + handlers.handleMessageEvent(statusUpdateResp); + + const execMsg = emitted.find(m => (m.metadata as ADKMetadata)?.originalType === 'ToolCallExecutionEvent'); + expect(execMsg).toBeDefined(); + const resultData = (execMsg!.metadata as ADKMetadata).toolResultData!; + expect(resultData).toHaveLength(1); + expect(resultData[0].subagent_session_id).toBe('sess-abc-123'); + }); + + test('extractMessagesFromTasks: agent function_call DataPart with kagent_subagent_session_id emits toolCallData with subagent_session_id', () => { + const tasks = [{ + contextId: 'ctx', + id: 'task', + history: [{ + kind: 'message', + messageId: 'msg-1', + role: 'agent', + parts: [{ + kind: 'data', + data: { id: 'agent_call_3', name: 'kagent__NS__k8s_agent', args: { request: 'list nodes' } }, + metadata: { kagent_type: 'function_call', kagent_subagent_session_id: 'sess-history-456' }, + }], + metadata: {}, + }], + }] as unknown as Task[]; + + const messages = extractMessagesFromTasks(tasks); + expect(messages).toHaveLength(1); + const meta = messages[0].metadata as ADKMetadata; + expect(meta.originalType).toBe('ToolCallRequestEvent'); + expect(meta.toolCallData).toHaveLength(1); + expect(meta.toolCallData![0].subagent_session_id).toBe('sess-history-456'); + }); + + test('extractMessagesFromTasks: agent function_response DataPart with subagent_session_id in response dict emits toolResultData with subagent_session_id', () => { + const tasks = [{ + contextId: 'ctx', + id: 'task', + history: [{ + kind: 'message', + messageId: 'msg-3', + role: 'agent', + parts: [{ + kind: 'data', + data: { + id: 'agent_call_3', + name: 'kagent__NS__k8s_agent', + response: { result: 'nodes listed', subagent_session_id: 'sess-history-456' }, + }, + metadata: { kagent_type: 'function_response' }, + }], + metadata: {}, + }], + }] as unknown as Task[]; + + const messages = extractMessagesFromTasks(tasks); + expect(messages).toHaveLength(1); + const meta = messages[0].metadata as ADKMetadata; + expect(meta.originalType).toBe('ToolCallExecutionEvent'); + expect(meta.toolResultData).toHaveLength(1); + expect(meta.toolResultData![0].subagent_session_id).toBe('sess-history-456'); + }); +}); + describe('getMetadataValue', () => { test('reads kagent_ prefixed key', () => { expect(getMetadataValue({ kagent_type: 'function_call' }, 'type')).toBe('function_call'); diff --git a/ui/src/lib/messageHandlers.ts b/ui/src/lib/messageHandlers.ts index e5fa808ff..638337ae9 100644 --- a/ui/src/lib/messageHandlers.ts +++ b/ui/src/lib/messageHandlers.ts @@ -84,12 +84,20 @@ export function extractMessagesFromTasks(tasks: Task[]): Message[] { // the function_response and are stamped on this card below. // Regular tool calls use the message's own invocation stats. const toolStats = isAgentToolName(toolData.name) ? undefined : msgStats; + const fcSubagentSessionId = isAgentToolName(toolData.name) + ? getMetadataValue(partMeta, "subagent_session_id") + : undefined; messages.push(createMessage("", source, { originalType: "ToolCallRequestEvent", contextId: msgContextId, taskId: msgTaskId, additionalMetadata: { - toolCallData: [{ id: toolData.id, name: toolData.name, args: (toolData.args as Record) || {} }], + toolCallData: [{ + id: toolData.id, + name: toolData.name, + args: (toolData.args as Record) || {}, + ...(fcSubagentSessionId ? { subagent_session_id: fcSubagentSessionId } : {}), + }], ...(toolStats && { tokenStats: toolStats }), }, })); @@ -97,6 +105,13 @@ export function extractMessagesFromTasks(tasks: Task[]): Message[] { } else if (partType === "function_response") { const toolData = dp.data as unknown as ToolResponseData; + let frSubagentSessionId: string | undefined; + if (isAgentToolName(toolData.name)) { + const responseObj = toolData.response as Record | undefined; + if (responseObj && typeof responseObj.subagent_session_id === "string") { + frSubagentSessionId = responseObj.subagent_session_id; + } + } messages.push(createMessage("", source, { originalType: "ToolCallExecutionEvent", contextId: msgContextId, @@ -107,6 +122,7 @@ export function extractMessagesFromTasks(tasks: Task[]): Message[] { name: toolData.name, content: normalizeToolResultToText(toolData), is_error: toolData.response?.isError || false, + ...(frSubagentSessionId ? { subagent_session_id: frSubagentSessionId } : {}), }], }, })); From eec886ea349698fcdcc42b8aa61a6be14400d5b4 Mon Sep 17 00:00:00 2001 From: Jet Chiang Date: Tue, 17 Mar 2026 15:02:45 -0400 Subject: [PATCH 7/8] docs and fix lint Signed-off-by: Jet Chiang --- docs/architecture/a2a-subagents.md | 61 ++++++++++++++ docs/architecture/subagent-viewing.md | 82 ------------------- .../src/kagent/adk/_agent_executor.py | 2 +- .../src/kagent/adk/_remote_a2a_tool.py | 6 +- 4 files changed, 67 insertions(+), 84 deletions(-) create mode 100644 docs/architecture/a2a-subagents.md delete mode 100644 docs/architecture/subagent-viewing.md diff --git a/docs/architecture/a2a-subagents.md b/docs/architecture/a2a-subagents.md new file mode 100644 index 000000000..aa94d4b79 --- /dev/null +++ b/docs/architecture/a2a-subagents.md @@ -0,0 +1,61 @@ +# A2A Subagents + +Kagent allows users to add subagents (other agents running on Kagent or remotely) as tools to a main agent, connected via the A2A protocol. This feature is enabled by `KAgentRemoteA2ATool` (`python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py`), kagent's custom replacement for the upstream `AgentTool(RemoteA2aAgent(...))` pairing. + +It directly manages the A2A conversation with a remote subagent and adds three things the upstream lacks: HITL propagation, live activity viewing, and user ID forwarding. + +See [human-in-the-loop.md](human-in-the-loop.md) for HITL details. + +--- + +## How it works + +Each parent A2A request creates a fresh `Runner` and fresh tool instances. `KAgentRemoteA2ATool.__init__` generates a UUID (`_last_context_id`) that is used as the A2A `context_id` for every message sent to the subagent. On the subagent side, this `context_id` becomes the session ID. + +`run_async` has two phases: + +- **Phase 1** (normal call): sends the request to the subagent and handles the response — returning the result, pausing for HITL if the subagent returns `input_required`, or returning an error string. +- **Phase 2** (HITL resume): reads the stored `task_id`/`context_id` from `tool_context.tool_confirmation.payload` and forwards the user's decision (approve / reject / batch / ask-user answers) to the subagent's pending task. + +On success, `run_async` returns: +```python +{"result": str, "subagent_session_id": str} # normal +{"result": str, "subagent_session_id": str, + "kagent_usage_metadata": dict} # with usage +{"status": "pending", "waiting_for": "subagent_approval", ...} # HITL pause +``` + +`KAgentRemoteA2AToolset` is a thin `BaseToolset` wrapper whose only job is ensuring the owned `httpx.AsyncClient` is closed when the runner shuts down — ADK's cleanup path only discovers `BaseToolset` instances, not bare `BaseTool` instances. + +--- + +## User ID and session tagging + +`_SubagentInterceptor` is registered on the A2A client at construction time and injects two headers on every outgoing request: + +| Header | Value | Purpose | +|---|---|---| +| `x-user-id` | parent session's user ID | Scopes the subagent DB session to the same user | +| `x-kagent-source` | `"subagent"` | Hides the session from the agent's session history sidebar | + +> Interceptors must be passed to `ClientFactory.create(interceptors=[...])` — `A2AClient.add_request_middleware()` appends to a list that the transport never reads. + +On the subagent side, `KAgentRequestContextBuilder` reads these headers and passes them through to `_prepare_session`, which calls `KAgentSessionService.create_session()` with `source="subagent"`. The Go layer stores this in a `Source` column and excludes such sessions from `ListSessionsForAgent`. + +--- + +## Live activity viewing + +The UI can show what a subagent is doing in a live panel before it finishes. This works because the session ID is known before the tool runs: + +Before the run loop, `A2aAgentExecutor` builds a `{tool_name → session_id}` map from all tools implementing the `SubagentSessionProvider` protocol (`subagent_session_id` property). The event converter stamps this as `kagent_subagent_session_id` metadata on each `function_call` DataPart as soon as the LLM emits the call. The UI reads it immediately and begins polling `/api/sessions/{id}` every 2 seconds, rendering the subagent's events as a nested chat thread. Nesting is capped at depth 3. + +The map is keyed by tool name because within one parent request, all calls to the same subagent tool intentionally share one `context_id` — giving the subagent conversation continuity across sequential invocations. A fresh `context_id` is generated on the next parent request when the runner rebuilds. + +When sending session requests to Go backend, take note that: + +| Session query | Includes subagent sessions? | +|---|---| +| `GET /api/sessions/agent/{ns}/{name}` | No — filtered by `source != 'subagent'` | +| `GET /api/sessions/{id}` | Yes | +| `GET /api/sessions/{id}/tasks` | Yes | diff --git a/docs/architecture/subagent-viewing.md b/docs/architecture/subagent-viewing.md deleted file mode 100644 index d98411e29..000000000 --- a/docs/architecture/subagent-viewing.md +++ /dev/null @@ -1,82 +0,0 @@ -# Subagent Live Activity Viewing - -Allows users to see what a subagent is doing **during and after** a parent agent's execution, without leaving the parent chat. An "Activity" panel appears inline on each subagent tool call card. - ---- - -## Overview - -When a parent agent delegates to a subagent via `KAgentRemoteA2ATool`, the subagent's session is created in the database under the same user. The UI polls that session for live events and renders them as a nested chat thread inside the parent conversation. - ---- - -## Request Flow - -### 1. Session ID stamping (Python, parent agent) - -`KAgentRemoteA2ATool` pre-generates a `context_id` (UUID) in `__init__` before the tool runs. The event converter stamps this ID as `kagent_subagent_session_id` metadata onto the `function_call` DataPart, so the UI knows the subagent session ID as soon as the LLM emits the call — before the tool actually executes. - -### 2. User ID passthrough (Python, A2A interceptor) - -`_SubagentInterceptor` is registered at A2A client construction time. It injects two headers on every outgoing A2A request: - -| Header | Value | Purpose | -|--------|-------|---------| -| `x-user-id` | parent session's `user_id` | Scopes the subagent session to the same DB user | -| `x-kagent-source` | `subagent` | Tags the session as subagent-originated | - -> **Note on A2A SDK:** `A2AClient.add_request_middleware()` appends to `Client._middleware`, which is never read by the transport. Interceptors must be passed to `ClientFactory.create(interceptors=[...])` at construction time to be registered on `JsonRpcTransport.interceptors`. - -### 3. Subagent session creation (Python, subagent runtime) - -The subagent's `KAgentRequestContextBuilder` reads both headers from `context.state["headers"]` (populated automatically by the A2A SDK's `DefaultCallContextBuilder`): - -- `x-user-id` → sets `context.user = KAgentUser(user_id=...)` so the session is stored under the correct user -- `x-kagent-source` → stored in `context.state["kagent_source"]` - -`_prepare_session` reads `kagent_source` directly from the request context. The value is threaded through `state["source"]` → `KAgentSessionService.create_session()` → Go `POST /api/sessions` with `"source": "subagent"` in the body. - -### 4. Session storage (Go) - -The `Session` model has a `Source *string` column (`"subagent"` or `nil`). `ListSessionsForAgent` excludes subagent sessions (`WHERE source IS NULL OR source != 'subagent'`), so they don't appear in the agent's session history sidebar. `GetSession` is unfiltered — the UI can still fetch them by ID. - -### 5. UI polling - -`AgentCallDisplay` shows an "Activity" button when `subagentSessionId` is present. Clicking it opens `SubagentActivityPanel`, which polls `getSubagentSessionWithEvents(sessionId)` every 2 seconds while the subagent is running, stopping once `isComplete`. - -Depth is tracked via `ActivityDepthContext` (max depth: 3) to prevent unbounded nesting. - ---- - -## Data Flow Summary - -``` -Parent LLM emits function_call for subagent tool - → event_converter stamps adk_subagent_session_id on DataPart - → UI extracts sessionId from DataPart metadata - → "Activity" button appears on AgentCallDisplay - -KAgentRemoteA2ATool executes - → _SubagentInterceptor adds x-user-id + x-kagent-source: subagent - → A2A request → subagent pod - -Subagent pod receives request - → KAgentRequestContextBuilder extracts headers - → _prepare_session creates session in DB: - user_id = admin@kagent.dev (from x-user-id) - source = "subagent" (from x-kagent-source) - -UI SubagentActivityPanel polls /api/sessions/{id} + /api/sessions/{id}/tasks - → Renders subagent's messages as nested chat thread - → Stops polling once parent function_response received -``` - ---- - -## Session Visibility - -| Query | Includes subagent sessions? | -|-------|----------------------------| -| `GET /api/sessions/agent/{ns}/{name}` (session history sidebar) | No — filtered by `source != 'subagent'` | -| `GET /api/sessions/{id}` (direct fetch by ID) | Yes — unfiltered | -| `GET /api/sessions/{id}/tasks` | Yes — unfiltered | diff --git a/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py b/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py index ac29ad9f9..5577c8f50 100644 --- a/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py +++ b/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py @@ -53,8 +53,8 @@ ) from ._mcp_toolset import is_anyio_cross_task_cancel_scope_error -from .converters.event_converter import convert_event_to_a2a_events, serialize_metadata_value from ._remote_a2a_tool import SubagentSessionProvider +from .converters.event_converter import convert_event_to_a2a_events, serialize_metadata_value from .converters.part_converter import convert_a2a_part_to_genai_part, convert_genai_part_to_a2a_part from .converters.request_converter import convert_a2a_request_to_adk_run_args diff --git a/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py b/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py index 40b041fc8..e94ca666e 100644 --- a/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py +++ b/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py @@ -413,7 +413,11 @@ async def _handle_resume(self, tool_context: ToolContext) -> Any: result_text = _extract_text_from_task(task) usage = _extract_usage_from_task(task) if usage: - return {"result": result_text, "kagent_usage_metadata": usage, "subagent_session_id": context_id or self._last_context_id} + return { + "result": result_text, + "kagent_usage_metadata": usage, + "subagent_session_id": context_id or self._last_context_id, + } # context_id from the confirmation payload is the original subagent session ID in case of interrupts return {"result": result_text, "subagent_session_id": context_id or self._last_context_id} From 5d1aeba397fc18aa2373833a1754f3d635399386 Mon Sep 17 00:00:00 2001 From: Jet Chiang Date: Wed, 18 Mar 2026 14:56:44 -0400 Subject: [PATCH 8/8] enum Signed-off-by: Jet Chiang --- docs/architecture/a2a-subagents.md | 4 ++-- go/api/database/models.go | 14 ++++++++++++-- go/api/httpapi/types.go | 9 ++++----- go/core/internal/database/client.go | 2 +- go/core/internal/database/fake/client.go | 6 +++--- .../kagent-adk/src/kagent/adk/_agent_executor.py | 2 +- .../kagent-adk/src/kagent/adk/_remote_a2a_tool.py | 6 +++--- .../kagent-core/src/kagent/core/a2a/_requests.py | 2 +- 8 files changed, 27 insertions(+), 18 deletions(-) diff --git a/docs/architecture/a2a-subagents.md b/docs/architecture/a2a-subagents.md index aa94d4b79..4f96283d3 100644 --- a/docs/architecture/a2a-subagents.md +++ b/docs/architecture/a2a-subagents.md @@ -36,7 +36,7 @@ On success, `run_async` returns: | Header | Value | Purpose | |---|---|---| | `x-user-id` | parent session's user ID | Scopes the subagent DB session to the same user | -| `x-kagent-source` | `"subagent"` | Hides the session from the agent's session history sidebar | +| `x-kagent-source` | `"agent"` | Hides the session from the agent's session history sidebar | > Interceptors must be passed to `ClientFactory.create(interceptors=[...])` — `A2AClient.add_request_middleware()` appends to a list that the transport never reads. @@ -56,6 +56,6 @@ When sending session requests to Go backend, take note that: | Session query | Includes subagent sessions? | |---|---| -| `GET /api/sessions/agent/{ns}/{name}` | No — filtered by `source != 'subagent'` | +| `GET /api/sessions/agent/{ns}/{name}` | No — filtered by `source != 'agent'` | | `GET /api/sessions/{id}` | Yes | | `GET /api/sessions/{id}/tasks` | Yes | diff --git a/go/api/database/models.go b/go/api/database/models.go index 16bb1a007..391c70e47 100644 --- a/go/api/database/models.go +++ b/go/api/database/models.go @@ -57,6 +57,16 @@ func ParseMessages(messages []Event) ([]*protocol.Message, error) { return result, nil } +// SessionSource represents the origin of a session. +type SessionSource string + +const ( + // SessionSourceUser indicates the session was initiated by a user. + SessionSourceUser SessionSource = "user" + // SessionSourceAgent indicates the session was created by a parent agent's A2A call. + SessionSourceAgent SessionSource = "agent" +) + type Session struct { ID string `gorm:"primaryKey;not null" json:"id"` Name *string `gorm:"index" json:"name,omitempty"` @@ -67,8 +77,8 @@ type Session struct { AgentID *string `gorm:"index" json:"agent_id"` // Source indicates how this session was created. - // nil or empty = user-initiated, "subagent" = created by a parent agent's A2A call. - Source *string `gorm:"index" json:"source,omitempty"` + // SessionSourceUser = user-initiated, SessionSourceAgent = created by a parent agent's A2A call. + Source *SessionSource `gorm:"index" json:"source,omitempty"` } type Task struct { diff --git a/go/api/httpapi/types.go b/go/api/httpapi/types.go index a657be0e2..5cc45cda9 100644 --- a/go/api/httpapi/types.go +++ b/go/api/httpapi/types.go @@ -100,11 +100,10 @@ type AgentResponse struct { // SessionRequest represents a session creation/update request type SessionRequest struct { - AgentRef *string `json:"agent_ref,omitempty"` - Name *string `json:"name,omitempty"` - ID *string `json:"id,omitempty"` - // Source indicates how this session was created (e.g. "subagent"). - Source *string `json:"source,omitempty"` + AgentRef *string `json:"agent_ref,omitempty"` + Name *string `json:"name,omitempty"` + ID *string `json:"id,omitempty"` + Source *database.SessionSource `json:"source,omitempty"` } // Run types diff --git a/go/core/internal/database/client.go b/go/core/internal/database/client.go index 62d20215e..8a38366a4 100644 --- a/go/core/internal/database/client.go +++ b/go/core/internal/database/client.go @@ -165,7 +165,7 @@ func (c *clientImpl) ListSessionsForAgent(ctx context.Context, agentID string, u var sessions []dbpkg.Session err := c.db.WithContext(ctx). Where("agent_id = ? AND user_id = ?", agentID, userID). - Where("source IS NULL OR source != ?", "subagent"). + Where("source IS NULL OR source != ?", dbpkg.SessionSourceAgent). Order("created_at ASC"). Find(&sessions).Error if err != nil { diff --git a/go/core/internal/database/fake/client.go b/go/core/internal/database/fake/client.go index 658f6d889..11157c478 100644 --- a/go/core/internal/database/fake/client.go +++ b/go/core/internal/database/fake/client.go @@ -336,7 +336,7 @@ func (c *InMemoryFakeClient) ListSessions(_ context.Context, userID string) ([]d return result, nil } -// ListSessionsForAgent lists all sessions for an agent, excluding subagent sessions. +// ListSessionsForAgent lists all sessions for an agent, excluding agent-initiated sessions. func (c *InMemoryFakeClient) ListSessionsForAgent(_ context.Context, agentID string, userID string) ([]database.Session, error) { c.mu.RLock() defer c.mu.RUnlock() @@ -344,8 +344,8 @@ func (c *InMemoryFakeClient) ListSessionsForAgent(_ context.Context, agentID str var result []database.Session for _, session := range c.sessions { if session.AgentID != nil && *session.AgentID == agentID && session.UserID == userID { - // Exclude subagent sessions from the listing - if session.Source != nil && *session.Source == "subagent" { + // Exclude agent-initiated sessions from the listing + if session.Source != nil && *session.Source == database.SessionSourceAgent { continue } result = append(result, *session) diff --git a/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py b/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py index 5577c8f50..726e5ad46 100644 --- a/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py +++ b/python/packages/kagent-adk/src/kagent/adk/_agent_executor.py @@ -704,7 +704,7 @@ async def _prepare_session(self, context: RequestContext, run_args: dict[str, An break state: dict[str, Any] = {"session_name": session_name} - # Propagate source (e.g. "subagent") so the session is tagged in the DB. + # Propagate source (e.g. "agent") so the session is tagged in the DB. source = None if context.call_context and context.call_context.state: source = context.call_context.state.get("kagent_source") diff --git a/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py b/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py index e94ca666e..84208f5ec 100644 --- a/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py +++ b/python/packages/kagent-adk/src/kagent/adk/_remote_a2a_tool.py @@ -58,14 +58,14 @@ _USER_ID_CONTEXT_KEY = "x-user-id" _SOURCE_HEADER = "x-kagent-source" -_SOURCE_SUBAGENT = "subagent" +_SOURCE_SUBAGENT = "agent" class _SubagentInterceptor(ClientCallInterceptor): """ Injects the authenticated user's ID as an ``x-user-id`` HTTP header and - marks the request as originating from a subagent call via - ``x-kagent-source: subagent`` on every outgoing A2A request. + marks the request as originating from an agent call via + ``x-kagent-source: agent`` on every outgoing A2A request. """ async def intercept(self, method_name, request_payload, http_kwargs, agent_card, context): diff --git a/python/packages/kagent-core/src/kagent/core/a2a/_requests.py b/python/packages/kagent-core/src/kagent/core/a2a/_requests.py index be9abe211..35a4e2670 100644 --- a/python/packages/kagent-core/src/kagent/core/a2a/_requests.py +++ b/python/packages/kagent-core/src/kagent/core/a2a/_requests.py @@ -48,7 +48,7 @@ async def build( if user_id: context.user = KAgentUser(user_id=user_id) # Propagate x-kagent-source so downstream code (e.g. session - # creation) can tag this session as subagent-originated. + # creation) can tag this session as agent-originated. source = headers.get("x-kagent-source", None) if source: context.state["kagent_source"] = source