diff --git a/Makefile b/Makefile index be9f7e49a..cdbaf690d 100644 --- a/Makefile +++ b/Makefile @@ -177,6 +177,7 @@ push-test-agent: buildx-create build-kagent-adk kubectl apply --namespace kagent --context kind-$(KIND_CLUSTER_NAME) -f go/core/test/e2e/agents/kebab/agent.yaml $(DOCKER_BUILDER) build --push $(BUILD_ARGS) $(TOOLS_IMAGE_BUILD_ARGS) -t $(DOCKER_REGISTRY)/poem-flow:latest -f python/samples/crewai/poem_flow/Dockerfile ./python $(DOCKER_BUILDER) build --push $(BUILD_ARGS) $(TOOLS_IMAGE_BUILD_ARGS) -t $(DOCKER_REGISTRY)/basic-openai:latest -f python/samples/openai/basic_agent/Dockerfile ./python + $(DOCKER_BUILDER) build --push $(BUILD_ARGS) $(TOOLS_IMAGE_BUILD_ARGS) -t $(DOCKER_REGISTRY)/langgraph-currency:latest -f python/samples/langgraph/currency/Dockerfile ./python .PHONY: push-test-skill push-test-skill: buildx-create diff --git a/go/core/test/e2e/invoke_api_test.go b/go/core/test/e2e/invoke_api_test.go index 8a605fb0f..6cc0c8aef 100644 --- a/go/core/test/e2e/invoke_api_test.go +++ b/go/core/test/e2e/invoke_api_test.go @@ -670,6 +670,43 @@ func generateOpenAIAgent(baseURL string) *v1alpha2.Agent { } } +func generateLangGraphAgent(baseURL string) *v1alpha2.Agent { + return &v1alpha2.Agent{ + ObjectMeta: metav1.ObjectMeta{ + Name: "currency-converter-test", + Namespace: "kagent", + }, + Spec: v1alpha2.AgentSpec{ + Description: "A currency converter LangGraph agent that can convert currencies", + Type: v1alpha2.AgentType_BYO, + BYO: &v1alpha2.BYOAgentSpec{ + Deployment: &v1alpha2.ByoDeploymentSpec{ + Image: "localhost:5001/langgraph-currency:latest", + SharedDeploymentSpec: v1alpha2.SharedDeploymentSpec{ + Env: []corev1.EnvVar{ + { + Name: "OPENAI_API_KEY", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "kagent-openai", + }, + Key: "OPENAI_API_KEY", + }, + }, + }, + { + Name: "OPENAI_API_BASE", + Value: baseURL + "/v1", + }, + }, + }, + }, + }, + }, + } +} + func generateCrewAIAgent(baseURL string) *v1alpha2.Agent { return &v1alpha2.Agent{ ObjectMeta: metav1.ObjectMeta{ @@ -716,13 +753,12 @@ func TestE2EInvokeOpenAIAgent(t *testing.T) { // Setup Kubernetes client cli := setupK8sClient(t, false) - // Setup specific resources - modelCfg := setupModelConfig(t, cli, baseURL) agent := generateOpenAIAgent(baseURL) // Create the agent on the cluster err := cli.Create(t.Context(), agent) require.NoError(t, err) + cleanup(t, cli, agent) // Wait for agent to be ready args := []string{ @@ -744,11 +780,6 @@ func TestE2EInvokeOpenAIAgent(t *testing.T) { // Poll until the A2A endpoint is actually serving requests through the proxy waitForEndpoint(t, agent.Namespace, agent.Name) - defer func() { - cli.Delete(t.Context(), agent) //nolint:errcheck - cli.Delete(t.Context(), modelCfg) //nolint:errcheck - }() - // Setup A2A client - use the agent's actual name a2aURL := a2aUrl("kagent", "basic-openai-test-agent") a2aClient, err := a2aclient.NewA2AClient(a2aURL) @@ -764,6 +795,69 @@ func TestE2EInvokeOpenAIAgent(t *testing.T) { }) } +func TestE2EInvokeLangGraphAgent(t *testing.T) { + baseURL, stopServer := setupMockServer(t, "mocks/invoke_langgraph_agent.json") + defer stopServer() + + cfg, err := config.GetConfig() + require.NoError(t, err) + + scheme := k8s_runtime.NewScheme() + err = v1alpha2.AddToScheme(scheme) + require.NoError(t, err) + err = corev1.AddToScheme(scheme) + require.NoError(t, err) + + cli, err := client.New(cfg, client.Options{ + Scheme: scheme, + }) + require.NoError(t, err) + + // Clean up any leftover agent from a previous failed run + _ = cli.Delete(t.Context(), &v1alpha2.Agent{ObjectMeta: metav1.ObjectMeta{Name: "currency-converter-test", Namespace: "kagent"}}) + + // Generate the LangGraph agent and inject the mock server's URL + agent := generateLangGraphAgent(baseURL) + + // Create the agent on the cluster + err = cli.Create(t.Context(), agent) + require.NoError(t, err) + cleanup(t, cli, agent) + + // Wait for the agent to become Ready + args := []string{ + "wait", + "--for", + "condition=Ready", + "--timeout=1m", + "agents.kagent.dev", + agent.Name, + "-n", + agent.Namespace, + } + + cmd := exec.CommandContext(t.Context(), "kubectl", args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + require.NoError(t, cmd.Run()) + + // Poll until the A2A endpoint is actually serving requests through the proxy + waitForEndpoint(t, agent.Namespace, agent.Name) + + // Setup A2A client + a2aURL := a2aUrl(agent.Namespace, agent.Name) + a2aClient, err := a2aclient.NewA2AClient(a2aURL) + require.NoError(t, err) + + t.Run("sync_invocation", func(t *testing.T) { + runSyncTest(t, a2aClient, "What is the exchange rate from USD to EUR?", "0.92", nil) + }) + + t.Run("streaming_invocation", func(t *testing.T) { + runStreamingTest(t, a2aClient, "What is the exchange rate from USD to EUR?", "0.92") + }) +} + func TestE2EInvokeCrewAIAgent(t *testing.T) { mockllmCfg, err := mockllm.LoadConfigFromFile("mocks/invoke_crewai_agent.json", mocks) require.NoError(t, err) @@ -802,6 +896,7 @@ func TestE2EInvokeCrewAIAgent(t *testing.T) { // Create the agent on the cluster err = cli.Create(t.Context(), agent) require.NoError(t, err) + cleanup(t, cli, agent) // Wait for the agent to become Ready args := []string{ @@ -842,8 +937,6 @@ func TestE2EInvokeCrewAIAgent(t *testing.T) { t.Run("streaming_invocation", func(t *testing.T) { runStreamingTest(t, a2aClient, "Generate a poem about CrewAI", "CrewAI is awesome, it makes coding fun.") }) - - cli.Delete(t.Context(), agent) //nolint:errcheck } func TestE2EInvokeSTSIntegration(t *testing.T) { diff --git a/go/core/test/e2e/mocks/invoke_langgraph_agent.json b/go/core/test/e2e/mocks/invoke_langgraph_agent.json new file mode 100644 index 000000000..388f79117 --- /dev/null +++ b/go/core/test/e2e/mocks/invoke_langgraph_agent.json @@ -0,0 +1,66 @@ +{ + "openai": [ + { + "name": "exchange_rate_tool_call", + "match": { + "match_type": "contains", + "message": { + "content": "What is the exchange rate from USD to EUR?", + "role": "user" + } + }, + "response": { + "id": "chatcmpl-langgraph-1", + "object": "chat.completion", + "created": 1677652288, + "model": "gpt-4o-mini", + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": null, + "tool_calls": [ + { + "id": "call_lg001", + "type": "function", + "function": { + "name": "_get_exchange_rate", + "arguments": "{\"currency_from\": \"USD\", \"currency_to\": \"EUR\", \"currency_date\": \"latest\"}" + } + } + ] + }, + "finish_reason": "tool_calls" + } + ] + } + }, + { + "name": "exchange_rate_tool_result", + "match": { + "match_type": "contains", + "message": { + "content": "USD", + "role": "tool" + } + }, + "response": { + "id": "chatcmpl-langgraph-2", + "object": "chat.completion", + "created": 1677652289, + "model": "gpt-4o-mini", + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": "The current exchange rate from USD to EUR is 0.92." + }, + "finish_reason": "stop" + } + ] + } + } + ] +} diff --git a/python/packages/kagent-langgraph/src/kagent/langgraph/_executor.py b/python/packages/kagent-langgraph/src/kagent/langgraph/_executor.py index 9d3642894..934ebb1ae 100644 --- a/python/packages/kagent-langgraph/src/kagent/langgraph/_executor.py +++ b/python/packages/kagent-langgraph/src/kagent/langgraph/_executor.py @@ -7,6 +7,7 @@ import asyncio import logging import uuid +from collections.abc import Mapping from datetime import datetime from typing import Any @@ -25,9 +26,9 @@ from a2a.server.agent_execution import AgentExecutor from a2a.server.agent_execution.context import RequestContext from a2a.server.events.event_queue import EventQueue -from a2a.server.tasks import TaskStore from a2a.types import ( Artifact, + DataPart, Message, Part, Role, @@ -41,13 +42,17 @@ from pydantic import BaseModel from kagent.core.a2a import ( + A2A_DATA_PART_METADATA_IS_LONG_RUNNING_KEY, + A2A_DATA_PART_METADATA_TYPE_FUNCTION_CALL, + A2A_DATA_PART_METADATA_TYPE_KEY, + KAGENT_HITL_DECISION_TYPE_BATCH, KAGENT_HITL_DECISION_TYPE_REJECT, TaskResultAggregator, - ToolApprovalRequest, + extract_ask_user_answers_from_message, + extract_batch_decisions_from_message, extract_decision_from_message, + extract_rejection_reasons_from_message, get_kagent_metadata_key, - handle_tool_approval_interrupt, - is_input_required_task, ) from kagent.core.tracing._span_processor import ( clear_kagent_span_attributes, @@ -170,7 +175,6 @@ async def _stream_graph_events( task_id=context.task_id, context_id=context.context_id, event_queue=event_queue, - task_store=context.task_store, ) # Interrupt detected - input_required event already sent, so return early return @@ -228,14 +232,16 @@ async def _handle_interrupt( task_id: str, context_id: str, event_queue: EventQueue, - task_store: TaskStore, ) -> None: """Handle interrupt from LangGraph and convert to A2A input_required event. - This is the LangGraph-specific adapter that extracts interrupt data from - LangGraph's format and delegates to the generic handler in kagent-core. + The BYO graph is expected to call ``interrupt()`` with a dict containing + ``action_requests`` -- a list of tool calls that need approval. + + This method converts them into ``DataPart`` objects with the same + ``adk_request_confirmation`` shape the ADK executor emits, so the + frontend can render them identically. """ - # Extract interrupt details from LangGraph format if not interrupt_data: logger.warning("Empty interrupt data received") return @@ -250,29 +256,71 @@ async def _handle_interrupt( logger.error(f"Unexpected interrupt data type: {type(first_item)}") return - # Extract LangGraph-specific fields action_requests_raw = interrupt_value.get("action_requests", []) - review_configs = interrupt_value.get("review_configs", []) - - # Convert to generic ToolApprovalRequest format - action_requests = [ - ToolApprovalRequest( - name=action.get("name", "unknown"), - args=action.get("args", {}), - id=action.get("id"), + if not action_requests_raw: + logger.warning("Interrupt has no action_requests, ignoring") + return + + # Build DataParts in the adk_request_confirmation wire format so the + # frontend renders tool-approval cards identically to the ADK executor. + parts: list[Part] = [] + for action in action_requests_raw: + if not isinstance(action, Mapping): + logger.warning( + "Skipping malformed action_request entry of type %s: %r", + type(action), + action, + ) + continue + tool_name = action["name"] + tool_args = action["args"] + tool_call_id = action["id"] + confirmation_id = str(uuid.uuid4()) + + parts.append( + Part( + DataPart( + data={ + "name": "adk_request_confirmation", + "id": confirmation_id, + "args": { + "originalFunctionCall": { + "name": tool_name, + "args": tool_args, + "id": tool_call_id, + }, + "toolConfirmation": { + "hint": f"Tool '{tool_name}' requires approval before execution.", + "confirmed": False, + "payload": None, + }, + }, + }, + metadata={ + get_kagent_metadata_key( + A2A_DATA_PART_METADATA_TYPE_KEY + ): A2A_DATA_PART_METADATA_TYPE_FUNCTION_CALL, + get_kagent_metadata_key(A2A_DATA_PART_METADATA_IS_LONG_RUNNING_KEY): True, + }, + ) + ) + ) + + await event_queue.enqueue_event( + TaskStatusUpdateEvent( + task_id=task_id, + status=TaskStatus( + state=TaskState.input_required, + timestamp=datetime.now(UTC).isoformat(), + message=Message( + message_id=str(uuid.uuid4()), + role=Role.agent, + parts=parts, + ), + ), + context_id=context_id, + final=False, ) - for action in action_requests_raw - ] - - # Delegate to generic handler in kagent-core - await handle_tool_approval_interrupt( - action_requests=action_requests, - task_id=task_id, - context_id=context_id, - event_queue=event_queue, - task_store=task_store, - app_name=self.app_name, - review_configs=review_configs, ) @override @@ -282,15 +330,12 @@ async def cancel(self, context: RequestContext, event_queue: EventQueue): raise NotImplementedError("Cancellation is not implemented") def _is_resume_command(self, context: RequestContext) -> bool: - """Check if message is a resume command for an interrupted task. - - Uses generic utilities from kagent-core for decision extraction. - """ + """Check if message is a resume command for an interrupted task.""" # Must have an existing task in input_required state to resume if not context.current_task: return False - if not is_input_required_task(context.current_task.status.state): + if context.current_task.status.state != TaskState.input_required: return False # Check if message contains a decision @@ -302,7 +347,20 @@ async def _handle_resume( context: RequestContext, event_queue: EventQueue, ) -> None: - """Resume graph execution after interrupt with user decision.""" + """Resume graph execution after interrupt with user decision. + + Extracts the full HITL decision payload from the A2A message and + forwards it to the graph via ``Command(resume=...)``. The resume + value includes: + + - ``decision_type``: ``"approve"``, ``"reject"``, or ``"batch"`` + - ``decisions``: per-tool decisions when ``decision_type`` is ``"batch"`` + - ``rejection_reasons``: optional per-tool rejection reasons + - ``ask_user_answers``: optional answers when resuming an ``ask_user`` interrupt + + The BYO graph's interrupt handler is responsible for reading and + acting on these fields. + """ # Extract decision from message using core utility decision_type = extract_decision_from_message(context.message) @@ -322,12 +380,34 @@ async def _handle_resume( # Fallback to computing from context (same as initial) thread_id = getattr(context, "session_id", None) or context.context_id + # Build the resume payload with all available HITL data. + # The graph receives this as the return value of interrupt(). + resume_value: dict[str, Any] = {"decision_type": decision_type} + + if decision_type == KAGENT_HITL_DECISION_TYPE_BATCH: + batch_decisions = extract_batch_decisions_from_message(context.message) + if batch_decisions: + resume_value["decisions"] = batch_decisions + + rejection_reasons = extract_rejection_reasons_from_message(context.message) + if rejection_reasons: + resume_value["rejection_reasons"] = rejection_reasons + + ask_user_answers = extract_ask_user_answers_from_message(context.message) + if ask_user_answers: + resume_value["ask_user_answers"] = ask_user_answers + logger.info( - f"Resuming after interrupt - task_id={context.task_id}, thread_id={thread_id}, decision={decision_type}" + "Resuming after interrupt - task_id=%s, thread_id=%s, decision=%s, has_batch=%s, has_reasons=%s, has_answers=%s", + context.task_id, + thread_id, + decision_type, + "decisions" in resume_value, + "rejection_reasons" in resume_value, + "ask_user_answers" in resume_value, ) - # Create resume input - resume_input = Command(resume={"decisions": [{"type": decision_type}]}) + resume_input = Command(resume=resume_value) span_attributes = _convert_a2a_request_to_span_attributes(context) # Create graph config with explicit thread_id diff --git a/python/samples/langgraph/currency/agent.yaml b/python/samples/langgraph/currency/agent.yaml index 53b0b5c18..e6d9b990d 100644 --- a/python/samples/langgraph/currency/agent.yaml +++ b/python/samples/langgraph/currency/agent.yaml @@ -9,11 +9,11 @@ spec: deployment: image: localhost:5001/langgraph-currency:latest env: - - name: GOOGLE_API_KEY + - name: OPENAI_API_KEY valueFrom: secretKeyRef: - name: kagent-google - key: GOOGLE_API_KEY + name: kagent-openai + key: OPENAI_API_KEY # Enable OpenTelemetry tracing, make sure to set this according to your environment - name: OTEL_TRACING_ENABLED value: "true" diff --git a/python/samples/langgraph/currency/currency/agent.py b/python/samples/langgraph/currency/currency/agent.py index 70732d7fa..a7964cee5 100644 --- a/python/samples/langgraph/currency/currency/agent.py +++ b/python/samples/langgraph/currency/currency/agent.py @@ -4,7 +4,7 @@ from kagent.core import KAgentConfig from kagent.langgraph import KAgentCheckpointer from langchain_core.tools import tool -from langchain_google_genai import ChatGoogleGenerativeAI +from langchain_openai import ChatOpenAI from langgraph.prebuilt import create_react_agent from langsmith import traceable @@ -66,7 +66,7 @@ def _get_exchange_rate( ) graph = create_react_agent( - model=ChatGoogleGenerativeAI(model="gemini-2.0-flash"), + model=ChatOpenAI(model="gpt-4o-mini"), tools=[get_exchange_rate], checkpointer=kagent_checkpointer, prompt=SYSTEM_INSTRUCTION, diff --git a/python/samples/langgraph/currency/pyproject.toml b/python/samples/langgraph/currency/pyproject.toml index 039f2bd1c..dfb16ea86 100644 --- a/python/samples/langgraph/currency/pyproject.toml +++ b/python/samples/langgraph/currency/pyproject.toml @@ -8,7 +8,7 @@ dependencies = [ "kagent-langgraph", "langgraph>=0.2.0", "langchain-core>=0.3.0", - "langchain-google-genai>=1.0.0", + "langchain-openai>=0.3.0", "langgraph-checkpoint-sqlite>=2.0.0", "langsmith[otel]>=0.4.30", ] diff --git a/python/samples/langgraph/hitl-tools/Dockerfile b/python/samples/langgraph/hitl-tools/Dockerfile new file mode 100644 index 000000000..be728cf13 --- /dev/null +++ b/python/samples/langgraph/hitl-tools/Dockerfile @@ -0,0 +1,38 @@ +### STAGE 1: base image +ARG DOCKER_REGISTRY=ghcr.io +ARG VERSION=latest +FROM ghcr.io/astral-sh/uv:python3.13-trixie-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + build-essential \ + && rm -rf /var/lib/apt/lists/* \ + && apt-get clean + +# Copy project files +COPY samples samples +COPY packages packages +COPY pyproject.toml pyproject.toml +COPY README.md README.md +COPY .python-version .python-version +COPY uv.lock uv.lock + +# Install dependencies +RUN uv sync --locked --no-dev \ + && uv cache clean + +# Set environment variables +ENV PYTHONPATH=/app +ENV PORT=8080 + +# Expose port +EXPOSE 8080 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8080/health || exit 1 + +# Run the application +CMD ["uv", "run", "samples/langgraph/hitl-tools/hitl_tools/cli.py"] diff --git a/python/samples/langgraph/hitl-tools/agent.yaml b/python/samples/langgraph/hitl-tools/agent.yaml new file mode 100644 index 000000000..3d3e7fa29 --- /dev/null +++ b/python/samples/langgraph/hitl-tools/agent.yaml @@ -0,0 +1,16 @@ +apiVersion: kagent.dev/v1alpha2 +kind: Agent +metadata: + name: hitl-tools-agent +spec: + description: A LangGraph agent demonstrating HITL tool approval with safe and dangerous tools + type: BYO + byo: + deployment: + image: localhost:5001/langgraph-hitl-tools:latest + env: + - name: OPENAI_API_KEY + valueFrom: + secretKeyRef: + name: kagent-openai + key: OPENAI_API_KEY diff --git a/python/samples/langgraph/hitl-tools/hitl_tools/__init__.py b/python/samples/langgraph/hitl-tools/hitl_tools/__init__.py new file mode 100644 index 000000000..02c597e11 --- /dev/null +++ b/python/samples/langgraph/hitl-tools/hitl_tools/__init__.py @@ -0,0 +1 @@ +from . import agent diff --git a/python/samples/langgraph/hitl-tools/hitl_tools/agent-card.json b/python/samples/langgraph/hitl-tools/hitl_tools/agent-card.json new file mode 100644 index 000000000..e9916d664 --- /dev/null +++ b/python/samples/langgraph/hitl-tools/hitl_tools/agent-card.json @@ -0,0 +1,19 @@ +{ + "name": "hitl-tools-agent", + "description": "A LangGraph agent demonstrating HITL tool approval. Has a safe tool (get_time) and a dangerous tool (delete_file) that requires human approval.", + "url": "localhost:8080", + "version": "0.1.0", + "capabilities": { + "streaming": true + }, + "defaultInputModes": ["text"], + "defaultOutputModes": ["text"], + "skills": [ + { + "id": "hitl-tools", + "name": "HITL Tools Demo", + "description": "Demonstrates tool approval flow with safe and dangerous tools", + "tags": ["hitl", "tools", "approval"] + } + ] +} diff --git a/python/samples/langgraph/hitl-tools/hitl_tools/agent.py b/python/samples/langgraph/hitl-tools/hitl_tools/agent.py new file mode 100644 index 000000000..dd4d333e5 --- /dev/null +++ b/python/samples/langgraph/hitl-tools/hitl_tools/agent.py @@ -0,0 +1,177 @@ +"""LangGraph agent demonstrating HITL tool approval. + +This sample builds a custom ReAct-style graph with two tools: + - get_time: safe, runs without approval + - delete_file: dangerous, requires human approval via interrupt() + +The graph intercepts tool calls and checks whether they need approval. +If so, it calls interrupt() with the action_requests format that the +kagent LangGraph executor expects, pausing execution until the user +approves or rejects. + +On resume, the executor passes the user's decision back via +Command(resume=...) and the graph reads it to proceed or skip. +""" + +import logging +from datetime import datetime +from typing import Annotated, Any + +import httpx +from kagent.core import KAgentConfig +from kagent.langgraph import KAgentCheckpointer +from langchain_core.messages import AIMessage, ToolMessage +from langchain_core.tools import tool +from langchain_openai import ChatOpenAI +from langgraph.graph import END, START, StateGraph +from langgraph.graph.message import add_messages +from langgraph.types import interrupt +from typing_extensions import TypedDict + +logger = logging.getLogger(__name__) + +kagent_checkpointer = KAgentCheckpointer( + client=httpx.AsyncClient(base_url=KAgentConfig().url), + app_name=KAgentConfig().app_name, +) + +# -- Tools ------------------------------------------------------------------- + +# Tools that require human approval before execution. +TOOLS_REQUIRING_APPROVAL = {"delete_file"} + + +@tool +def get_time() -> str: + """Get the current date and time. This is a safe tool that runs without approval.""" + return datetime.now().isoformat() + + +@tool +def delete_file(path: str) -> str: + """Delete a file at the given path. This is a dangerous operation that requires human approval. + + Args: + path: The file path to delete. + """ + # In a real agent this would actually delete the file. + # For this demo we just pretend. + return f"File '{path}' has been deleted." + + +ALL_TOOLS = [get_time, delete_file] +TOOL_MAP = {t.name: t for t in ALL_TOOLS} + +# -- Graph state ------------------------------------------------------------- + + +class AgentState(TypedDict): + messages: Annotated[list, add_messages] + + +# -- Graph nodes -------------------------------------------------------------- + +llm = ChatOpenAI(model="gpt-4o-mini").bind_tools(ALL_TOOLS) + + +async def call_model(state: AgentState) -> dict[str, Any]: + """Call the LLM with the current messages.""" + response = await llm.ainvoke(state["messages"]) + return {"messages": [response]} + + +async def run_tools(state: AgentState) -> dict[str, Any]: + """Execute tool calls, requesting approval for dangerous tools via interrupt(). + + For each tool call in the last AI message: + - If the tool is in TOOLS_REQUIRING_APPROVAL, call interrupt() with the + action_requests format the kagent executor expects. The executor + converts this into an A2A input_required event so the frontend shows + approve / reject buttons. + - If approved (or no approval needed), execute the tool normally. + - If rejected, return a message telling the LLM the tool was rejected. + """ + last_message = state["messages"][-1] + assert isinstance(last_message, AIMessage) and last_message.tool_calls + + results: list[ToolMessage] = [] + + for tool_call in last_message.tool_calls: + tool_name = tool_call["name"] + tool_args = tool_call["args"] + tool_call_id = tool_call["id"] + + if tool_name in TOOLS_REQUIRING_APPROVAL: + # Pause execution and ask the user for approval. + # The executor reads "action_requests" from the interrupt value + # and emits an adk_request_confirmation DataPart to the frontend. + decision = interrupt( + { + "action_requests": [ + { + "name": tool_name, + "args": tool_args, + "id": tool_call_id, + } + ] + } + ) + + # The executor resumes with a dict like: + # {"decision_type": "approve"} + # {"decision_type": "reject", "rejection_reasons": {"*": "Too risky"}} + decision_type = decision.get("decision_type", "reject") if isinstance(decision, dict) else "reject" + + if decision_type != "approve": + reason = "" + if isinstance(decision, dict): + reasons = decision.get("rejection_reasons", {}) + reason = reasons.get("*", "") if isinstance(reasons, dict) else "" + rejection_msg = "Tool call was rejected by user." + if reason: + rejection_msg += f" Reason: {reason}" + results.append( + ToolMessage( + content=rejection_msg, + tool_call_id=tool_call_id, + name=tool_name, + ) + ) + continue + + # Execute the tool (either no approval needed, or approved). + tool_fn = TOOL_MAP[tool_name] + result = await tool_fn.ainvoke(tool_args) + results.append( + ToolMessage( + content=str(result), + tool_call_id=tool_call_id, + name=tool_name, + ) + ) + + return {"messages": results} + + +# -- Routing ------------------------------------------------------------------ + + +def should_continue(state: AgentState) -> str: + """Route to tools if the last message has tool calls, otherwise end.""" + last_message = state["messages"][-1] + if isinstance(last_message, AIMessage) and last_message.tool_calls: + return "tools" + return END + + +# -- Build graph -------------------------------------------------------------- + +builder = StateGraph(AgentState) +builder.add_node("agent", call_model) +builder.add_node("tools", run_tools) + +builder.add_edge(START, "agent") +builder.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END}) +builder.add_edge("tools", "agent") + +graph = builder.compile(checkpointer=kagent_checkpointer) diff --git a/python/samples/langgraph/hitl-tools/hitl_tools/cli.py b/python/samples/langgraph/hitl-tools/hitl_tools/cli.py new file mode 100644 index 000000000..c2d0b7e72 --- /dev/null +++ b/python/samples/langgraph/hitl-tools/hitl_tools/cli.py @@ -0,0 +1,38 @@ +"""CLI for the HITL tools LangGraph agent.""" + +import json +import logging +import os + +import uvicorn +from agent import graph +from kagent.core import KAgentConfig +from kagent.langgraph import KAgentApp + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") + +logger = logging.getLogger(__name__) + + +def main(): + """Main entry point for the CLI.""" + with open(os.path.join(os.path.dirname(__file__), "agent-card.json"), "r") as f: + agent_card = json.load(f) + + config = KAgentConfig() + app = KAgentApp(graph=graph, agent_card=agent_card, config=config, tracing=True) + + port = int(os.getenv("PORT", "8080")) + host = os.getenv("HOST", "0.0.0.0") + logger.info(f"Starting server on {host}:{port}") + + uvicorn.run( + app.build(), + host=host, + port=port, + log_level="info", + ) + + +if __name__ == "__main__": + main() diff --git a/python/samples/langgraph/hitl-tools/pyproject.toml b/python/samples/langgraph/hitl-tools/pyproject.toml new file mode 100644 index 000000000..4217459b4 --- /dev/null +++ b/python/samples/langgraph/hitl-tools/pyproject.toml @@ -0,0 +1,24 @@ +[project] +name = "hitl-tools" +version = "0.1.0" +description = "LangGraph agent demonstrating HITL tool approval" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "kagent-langgraph", + "langgraph>=0.2.0", + "langchain-core>=0.3.0", + "langchain-openai>=0.3.0", + "langgraph-checkpoint-sqlite>=2.0.0", + "langsmith[otel]>=0.4.30", +] + +[build-system] +requires = ["setuptools>=61.0", "wheel>=0.46.2"] +build-backend = "setuptools.build_meta" + +[project.scripts] +hitl-tools = "hitl_tools.cli:main" + +[tool.uv.sources] +kagent-langgraph = { workspace = true } diff --git a/python/uv.lock b/python/uv.lock index b4d8c5903..3997b9ef8 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -14,6 +14,7 @@ members = [ "basic", "basic-openai-agent", "currency", + "hitl-tools", "kagent-adk", "kagent-core", "kagent-crewai", @@ -917,7 +918,7 @@ source = { editable = "samples/langgraph/currency" } dependencies = [ { name = "kagent-langgraph" }, { name = "langchain-core" }, - { name = "langchain-google-genai" }, + { name = "langchain-openai" }, { name = "langgraph" }, { name = "langgraph-checkpoint-sqlite" }, { name = "langsmith", extra = ["otel"] }, @@ -927,7 +928,7 @@ dependencies = [ requires-dist = [ { name = "kagent-langgraph", editable = "packages/kagent-langgraph" }, { name = "langchain-core", specifier = ">=0.3.0" }, - { name = "langchain-google-genai", specifier = ">=1.0.0" }, + { name = "langchain-openai", specifier = ">=0.3.0" }, { name = "langgraph", specifier = ">=0.2.0" }, { name = "langgraph-checkpoint-sqlite", specifier = ">=2.0.0" }, { name = "langsmith", extras = ["otel"], specifier = ">=0.4.30" }, @@ -1111,15 +1112,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b5/36/7fb70f04bf00bc646cd5bb45aa9eddb15e19437a28b8fb2b4a5249fac770/filelock-3.20.3-py3-none-any.whl", hash = "sha256:4b0dda527ee31078689fc205ec4f1c1bf7d56cf88b6dc9426c4f230e46c2dce1", size = 16701, upload-time = "2026-01-09T17:55:04.334Z" }, ] -[[package]] -name = "filetype" -version = "1.2.0" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/bb/29/745f7d30d47fe0f251d3ad3dc2978a23141917661998763bebb6da007eb1/filetype-1.2.0.tar.gz", hash = "sha256:66b56cd6474bf41d8c54660347d37afcc3f7d1970648de365c102ef77548aadb", size = 998020, upload-time = "2022-11-02T17:34:04.141Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/18/79/1b8fa1bb3568781e84c9200f951c735f3f157429f44be0495da55894d620/filetype-1.2.0-py2.py3-none-any.whl", hash = "sha256:7ce71b6880181241cf7ac8697a2f1eb6a8bd9b429f7ad6d27b8db9ba5f1c2d25", size = 19970, upload-time = "2022-11-02T17:34:01.425Z" }, -] - [[package]] name = "flatbuffers" version = "25.9.23" @@ -1919,6 +1911,29 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f0/55/ef77a85ee443ae05a9e9cba1c9f0dd9241eb42da2aeba1dc50f51154c81a/hf_xet-1.1.5-cp37-abi3-win_amd64.whl", hash = "sha256:73e167d9807d166596b4b2f0b585c6d5bd84a26dea32843665a8b58f6edba245", size = 2738931, upload-time = "2025-06-20T21:48:39.482Z" }, ] +[[package]] +name = "hitl-tools" +version = "0.1.0" +source = { editable = "samples/langgraph/hitl-tools" } +dependencies = [ + { name = "kagent-langgraph" }, + { name = "langchain-core" }, + { name = "langchain-openai" }, + { name = "langgraph" }, + { name = "langgraph-checkpoint-sqlite" }, + { name = "langsmith", extra = ["otel"] }, +] + +[package.metadata] +requires-dist = [ + { name = "kagent-langgraph", editable = "packages/kagent-langgraph" }, + { name = "langchain-core", specifier = ">=0.3.0" }, + { name = "langchain-openai", specifier = ">=0.3.0" }, + { name = "langgraph", specifier = ">=0.2.0" }, + { name = "langgraph-checkpoint-sqlite", specifier = ">=2.0.0" }, + { name = "langsmith", extras = ["otel"], specifier = ">=0.4.30" }, +] + [[package]] name = "httpcore" version = "1.0.9" @@ -2636,18 +2651,17 @@ wheels = [ ] [[package]] -name = "langchain-google-genai" -version = "4.2.0" +name = "langchain-openai" +version = "1.1.7" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "filetype" }, - { name = "google-genai" }, { name = "langchain-core" }, - { name = "pydantic" }, + { name = "openai" }, + { name = "tiktoken" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/d8/0b/eae2305e207574dc633983a8a82a745e0ede1bce1f3a9daff24d2341fadc/langchain_google_genai-4.2.0.tar.gz", hash = "sha256:9a8d9bfc35354983ed29079cefff53c3e7c9c2a44b6ba75cc8f13a0cf8b55c33", size = 277361, upload-time = "2026-01-13T20:41:17.63Z" } +sdist = { url = "https://files.pythonhosted.org/packages/38/b7/30bfc4d1b658a9ee524bcce3b0b2ec9c45a11c853a13c4f0c9da9882784b/langchain_openai-1.1.7.tar.gz", hash = "sha256:f5ec31961ed24777548b63a5fe313548bc6e0eb9730d6552b8c6418765254c81", size = 1039134, upload-time = "2026-01-07T19:44:59.728Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/22/51/39942c0083139652494bb354dddf0ed397703a4882302f7b48aeca531c96/langchain_google_genai-4.2.0-py3-none-any.whl", hash = "sha256:856041aaafceff65a4ef0d5acf5731f2db95229ff041132af011aec51e8279d9", size = 66452, upload-time = "2026-01-13T20:41:16.296Z" }, + { url = "https://files.pythonhosted.org/packages/64/a1/50e7596aca775d8c3883eceeaf47489fac26c57c1abe243c00174f715a8a/langchain_openai-1.1.7-py3-none-any.whl", hash = "sha256:34e9cd686aac1a120d6472804422792bf8080a2103b5d21ee450c9e42d053815", size = 84753, upload-time = "2026-01-07T19:44:58.629Z" }, ] [[package]]