diff --git a/__tests__/components/conversation-events/chat/event-content-helpers/should-render-event.test.ts b/__tests__/components/conversation-events/chat/event-content-helpers/should-render-event.test.ts index 34f825360..63f1aa9c2 100644 --- a/__tests__/components/conversation-events/chat/event-content-helpers/should-render-event.test.ts +++ b/__tests__/components/conversation-events/chat/event-content-helpers/should-render-event.test.ts @@ -7,6 +7,7 @@ import { createUserMessageEvent, } from "test-utils"; import { ACPToolCallEvent } from "#/types/agent-server/core/events/acp-tool-call-event"; +import { StreamingDeltaEvent } from "#/types/agent-server/core/events/streaming-delta-event"; import { ActionEvent, ObservationEvent, @@ -96,6 +97,46 @@ describe("shouldRenderEvent - ACPToolCallEvent", () => { }); }); +describe("shouldRenderEvent - StreamingDeltaEvent", () => { + const makeStreamingDelta = ( + overrides: Partial = {}, + ): StreamingDeltaEvent => ({ + id: "delta-1", + kind: "StreamingDeltaEvent", + timestamp: "2024-01-01T00:00:00Z", + source: "agent", + content: "I'll start working on that.", + reasoning_content: null, + ...overrides, + }); + + it("renders text deltas", () => { + expect(shouldRenderEvent(makeStreamingDelta())).toBe(true); + }); + + it("renders reasoning-only deltas", () => { + expect( + shouldRenderEvent( + makeStreamingDelta({ + content: null, + reasoning_content: "thinking", + }), + ), + ).toBe(true); + }); + + it("hides empty deltas", () => { + expect( + shouldRenderEvent( + makeStreamingDelta({ + content: null, + reasoning_content: null, + }), + ), + ).toBe(false); + }); +}); + describe("shouldRenderEvent - SwitchLLM", () => { const switchAction: ActionEvent = { id: "switch-action", diff --git a/__tests__/utils/handle-event-for-ui.test.ts b/__tests__/utils/handle-event-for-ui.test.ts index 60e74dd53..bccfe72d2 100644 --- a/__tests__/utils/handle-event-for-ui.test.ts +++ b/__tests__/utils/handle-event-for-ui.test.ts @@ -7,6 +7,7 @@ import { OpenHandsEvent, } from "#/types/agent-server/core"; import { ACPToolCallEvent } from "#/types/agent-server/core/events/acp-tool-call-event"; +import { StreamingDeltaEvent } from "#/types/agent-server/core/events/streaming-delta-event"; import { handleEventForUI } from "#/utils/handle-event-for-ui"; describe("handleEventForUI", () => { @@ -76,6 +77,56 @@ describe("handleEventForUI", () => { extended_content: [], }; + const mockFinishActionEvent: ActionEvent = { + id: "test-finish-action-1", + timestamp: Date.now().toString(), + source: "agent", + thought: [], + thinking_blocks: [], + action: { + kind: "FinishAction", + message: "I'll start working on that. Done.", + }, + tool_name: "finish", + tool_call_id: "call_finish_1", + tool_call: { + id: "call_finish_1", + type: "function", + function: { + name: "finish", + arguments: JSON.stringify({ + message: "I'll start working on that. Done.", + }), + }, + }, + llm_response_id: "response_finish", + security_risk: SecurityRisk.UNKNOWN, + }; + + const mockAgentMessageEvent: MessageEvent = { + id: "test-agent-message-1", + timestamp: Date.now().toString(), + source: "agent", + llm_message: { + role: "assistant", + content: [{ type: "text", text: "I'll start working on that. Done." }], + }, + activated_microagents: [], + extended_content: [], + }; + + const makeStreamingDelta = ( + id: string, + content: string | null, + ): StreamingDeltaEvent => ({ + id, + kind: "StreamingDeltaEvent", + timestamp: Date.now().toString(), + source: "agent", + content, + reasoning_content: null, + }); + it("should add non-observation events to the end of uiEvents", () => { const initialUiEvents = [mockMessageEvent]; const result = handleEventForUI(mockActionEvent, initialUiEvents); @@ -241,6 +292,153 @@ describe("handleEventForUI", () => { }); }); + describe("StreamingDeltaEvent", () => { + it("merges consecutive deltas into a single provisional assistant event", () => { + const first = makeStreamingDelta("delta-1", "I'll start "); + const second = makeStreamingDelta("delta-2", "working on that."); + + const afterFirst = handleEventForUI(first, [mockMessageEvent]); + const afterSecond = handleEventForUI(second, afterFirst); + + expect(afterSecond).toEqual([ + mockMessageEvent, + { + ...second, + content: "I'll start working on that.", + reasoning_content: null, + }, + ]); + }); + + it("finalizes streamed deltas in place when finish arrives", () => { + const first = makeStreamingDelta("delta-1", "I'll start "); + const second = makeStreamingDelta("delta-2", "working on that."); + const streamedDelta = handleEventForUI( + second, + handleEventForUI(first, []), + ).at(-1)!; + const uiEvents = [mockMessageEvent, streamedDelta]; + + const result = handleEventForUI(mockFinishActionEvent, uiEvents); + + expect(result).toEqual([ + mockMessageEvent, + { + ...streamedDelta, + content: "I'll start working on that. Done.", + }, + ]); + }); + + it("finalizes streamed deltas in place when an agent message arrives", () => { + const first = makeStreamingDelta("delta-1", "I'll start "); + const second = makeStreamingDelta("delta-2", "working on that."); + const streamedDelta = handleEventForUI( + second, + handleEventForUI(first, []), + ).at(-1)!; + const uiEvents = [mockMessageEvent, streamedDelta]; + + const result = handleEventForUI(mockAgentMessageEvent, uiEvents); + + expect(result).toEqual([ + mockMessageEvent, + { + ...streamedDelta, + content: "I'll start working on that. Done.", + }, + ]); + }); + + it("keeps streamed deltas in their original locations when the final message aggregates them", () => { + const first = makeStreamingDelta( + "delta-1", + "I'll start working on that.", + ); + const second = makeStreamingDelta("delta-2", "I found the issue."); + const aggregateAgentMessage: MessageEvent = { + ...mockAgentMessageEvent, + llm_message: { + role: "assistant", + content: [ + { + type: "text", + text: "I'll start working on that.I found the issue.", + }, + ], + }, + }; + + const afterFirst = handleEventForUI(first, [mockMessageEvent]); + const afterObservation = handleEventForUI(mockObservationEvent, afterFirst); + const afterSecond = handleEventForUI(second, afterObservation); + const result = handleEventForUI(aggregateAgentMessage, afterSecond); + + expect(result).toEqual([ + mockMessageEvent, + first, + mockObservationEvent, + second, + ]); + }); + + it("appends a distinct final message that does not match streamed text", () => { + const streamedDelta = makeStreamingDelta( + "delta-1", + "I'll start working on that.", + ); + const finalMessage: MessageEvent = { + ...mockAgentMessageEvent, + llm_message: { + role: "assistant", + content: [{ type: "text", text: "Done." }], + }, + }; + + const result = handleEventForUI(finalMessage, [ + mockMessageEvent, + streamedDelta, + ]); + + expect(result).toEqual([mockMessageEvent, streamedDelta, finalMessage]); + }); + + it("keeps deltas from older turns when a later turn finishes", () => { + const oldUserMessage: MessageEvent = { + ...mockMessageEvent, + id: "old-user-message", + }; + const nextUserMessage: MessageEvent = { + ...mockMessageEvent, + id: "next-user-message", + llm_message: { + role: "user", + content: [{ type: "text", text: "Next task" }], + }, + }; + const oldDelta = makeStreamingDelta("old-delta", "Old live text"); + const currentDelta = makeStreamingDelta( + "current-delta", + "Current live text", + ); + + const result = handleEventForUI(mockFinishActionEvent, [ + oldUserMessage, + oldDelta, + nextUserMessage, + currentDelta, + ]); + + expect(result).toEqual([ + oldUserMessage, + oldDelta, + nextUserMessage, + currentDelta, + mockFinishActionEvent, + ]); + }); + }); + it("should NOT add ThinkObservation even when ThinkAction is not found", () => { const mockThinkObservation: ObservationEvent = { id: "test-think-observation-1", diff --git a/src/components/conversation-events/chat/event-content-helpers/should-render-event.ts b/src/components/conversation-events/chat/event-content-helpers/should-render-event.ts index 18efbbb4e..46fe34a80 100644 --- a/src/components/conversation-events/chat/event-content-helpers/should-render-event.ts +++ b/src/components/conversation-events/chat/event-content-helpers/should-render-event.ts @@ -7,6 +7,7 @@ import { isConversationStateUpdateEvent, isHookExecutionEvent, isACPToolCallEvent, + isStreamingDeltaEvent, } from "#/types/agent-server/type-guards"; export const shouldRenderEvent = (event: OpenHandsEvent) => { @@ -89,6 +90,10 @@ export const shouldRenderEvent = (event: OpenHandsEvent) => { return event.status === "completed" || event.status === "failed"; } + if (isStreamingDeltaEvent(event)) { + return event.content !== null || event.reasoning_content !== null; + } + // Don't render any other event types (system events, etc.) return false; }; diff --git a/src/components/conversation-events/chat/event-message.tsx b/src/components/conversation-events/chat/event-message.tsx index d9051437c..f32185096 100644 --- a/src/components/conversation-events/chat/event-message.tsx +++ b/src/components/conversation-events/chat/event-message.tsx @@ -16,11 +16,13 @@ import { isPlanningFileEditorObservationEvent, isHookExecutionEvent, isACPToolCallEvent, + isStreamingDeltaEvent, } from "#/types/agent-server/type-guards"; import { useConfig } from "#/hooks/query/use-config"; import { useConversationStore } from "#/stores/conversation-store"; import { useAgentState } from "#/hooks/use-agent-state"; import { AgentState } from "#/types/agent-state"; +import { ChatMessage } from "#/components/features/chat/chat-message"; import { PlanPreview } from "../../features/chat/plan-preview"; import { ErrorEventMessage } from "./event-message-components/error-event-message"; import { UserAssistantEventMessage } from "./event-message-components/user-assistant-event-message"; @@ -176,6 +178,23 @@ export function EventMessage({ ); } + if (isStreamingDeltaEvent(event)) { + const content = event.content ?? ""; + const reasoningContent = event.reasoning_content ?? ""; + return ( + <> + {reasoningContent && } + {content && ( + + )} + + ); + } + // Finish actions if (isActionEvent(event) && event.action.kind === "FinishAction") { return ( diff --git a/src/types/agent-server/core/events/index.ts b/src/types/agent-server/core/events/index.ts index 08740d113..f74a1fcb2 100644 --- a/src/types/agent-server/core/events/index.ts +++ b/src/types/agent-server/core/events/index.ts @@ -7,4 +7,5 @@ export * from "./hook-execution-event"; export * from "./message-event"; export * from "./observation-event"; export * from "./pause-event"; +export * from "./streaming-delta-event"; export * from "./system-event"; diff --git a/src/types/agent-server/core/events/streaming-delta-event.ts b/src/types/agent-server/core/events/streaming-delta-event.ts new file mode 100644 index 000000000..f5c4221da --- /dev/null +++ b/src/types/agent-server/core/events/streaming-delta-event.ts @@ -0,0 +1,8 @@ +import { BaseEvent } from "../base/event"; + +export interface StreamingDeltaEvent extends BaseEvent { + kind: "StreamingDeltaEvent"; + source: "agent"; + content: string | null; + reasoning_content: string | null; +} diff --git a/src/types/agent-server/core/openhands-event.ts b/src/types/agent-server/core/openhands-event.ts index 4eaa25050..dcf2ddc36 100644 --- a/src/types/agent-server/core/openhands-event.ts +++ b/src/types/agent-server/core/openhands-event.ts @@ -15,6 +15,7 @@ import { HookExecutionEvent, PauseEvent, ServerErrorEvent, + StreamingDeltaEvent, } from "./events/index"; /** @@ -41,4 +42,5 @@ export type OpenHandsEvent = | ConversationErrorEvent // Control events | PauseEvent - | ServerErrorEvent; + | ServerErrorEvent + | StreamingDeltaEvent; diff --git a/src/types/agent-server/type-guards.ts b/src/types/agent-server/type-guards.ts index c65143657..5c5d91aba 100644 --- a/src/types/agent-server/type-guards.ts +++ b/src/types/agent-server/type-guards.ts @@ -25,6 +25,7 @@ import { } from "./core/events/conversation-state-event"; import { HookExecutionEvent } from "./core/events/hook-execution-event"; import { ACPToolCallEvent } from "./core/events/acp-tool-call-event"; +import { StreamingDeltaEvent } from "./core/events/streaming-delta-event"; import { SystemPromptEvent } from "./core/events/system-event"; /** @@ -249,6 +250,11 @@ export const isACPToolCallEvent = ( ): event is ACPToolCallEvent => "kind" in event && event.kind === "ACPToolCallEvent"; +export const isStreamingDeltaEvent = ( + event: OpenHandsEvent, +): event is StreamingDeltaEvent => + "kind" in event && event.kind === "StreamingDeltaEvent"; + // ============================================================================= // COMPATIBILITY TYPE GUARDS // ============================================================================= diff --git a/src/utils/handle-event-for-ui.ts b/src/utils/handle-event-for-ui.ts index 8298c7dff..e70159ca9 100644 --- a/src/utils/handle-event-for-ui.ts +++ b/src/utils/handle-event-for-ui.ts @@ -1,8 +1,135 @@ -import { OpenHandsEvent } from "#/types/agent-server/core"; +import { MessageEvent, OpenHandsEvent } from "#/types/agent-server/core"; import { isACPToolCallEvent, + isActionEvent, + isMessageEvent, isObservationEvent, + isStreamingDeltaEvent, } from "#/types/agent-server/type-guards"; +import { StreamingDeltaEvent } from "#/types/agent-server/core/events/streaming-delta-event"; + +const mergeStreamingDeltaEvent = ( + incoming: StreamingDeltaEvent, + existing: StreamingDeltaEvent, +): StreamingDeltaEvent => ({ + ...incoming, + content: `${existing.content ?? ""}${incoming.content ?? ""}` || null, + reasoning_content: + `${existing.reasoning_content ?? ""}${incoming.reasoning_content ?? ""}` || + null, +}); + +const appendContentToStreamingDeltaEvent = ( + existing: StreamingDeltaEvent, + content: string, +): StreamingDeltaEvent => ({ + ...existing, + content: `${existing.content ?? ""}${content}` || null, +}); + +const findLastUserMessageIndex = (events: OpenHandsEvent[]): number => { + for (let index = events.length - 1; index >= 0; index -= 1) { + const event = events[index]; + if (isMessageEvent(event) && event.source === "user") { + return index; + } + } + return -1; +}; + +const getAgentMessageText = (event: MessageEvent): string => + event.llm_message.content + .filter((content) => content.type === "text") + .map((content) => content.text) + .join("\n"); + +const getFinalAgentText = (event: OpenHandsEvent): string | null => { + if (isActionEvent(event) && event.action.kind === "FinishAction") { + return event.action.message; + } + + if (isMessageEvent(event) && event.source === "agent") { + return getAgentMessageText(event); + } + + return null; +}; + +const findTextSegmentsInOrder = ( + text: string, + segments: string[], +): { matched: boolean; lastMatchEnd: number } => { + let searchStart = 0; + let lastMatchEnd = 0; + + for (const segment of segments) { + const index = text.indexOf(segment, searchStart); + if (index === -1) { + return { matched: false, lastMatchEnd }; + } + lastMatchEnd = index + segment.length; + searchStart = lastMatchEnd; + } + + return { matched: true, lastMatchEnd }; +}; + +const finalizeStreamingDeltasInPlace = ( + finalEvent: OpenHandsEvent, + uiEvents: OpenHandsEvent[], +): OpenHandsEvent[] | null => { + const lastUserMessageIndex = findLastUserMessageIndex(uiEvents); + const currentTurnStreamingDeltaIndexes = uiEvents + .map((uiEvent, index) => ({ uiEvent, index })) + .filter( + ({ uiEvent, index }) => + index > lastUserMessageIndex && isStreamingDeltaEvent(uiEvent), + ) + .map(({ index }) => index); + + if (currentTurnStreamingDeltaIndexes.length === 0) { + return null; + } + + const finalText = getFinalAgentText(finalEvent); + const streamingSegments = currentTurnStreamingDeltaIndexes + .map((index) => uiEvents[index]) + .filter(isStreamingDeltaEvent) + .map((uiEvent) => uiEvent.content ?? "") + .filter((content) => content.length > 0); + + if (!finalText || streamingSegments.length === 0) { + return null; + } + + const nextUiEvents = [...uiEvents]; + const streamedText = streamingSegments.join(""); + let unstreamedSuffix = ""; + + if (finalText.startsWith(streamedText)) { + unstreamedSuffix = finalText.slice(streamedText.length); + } else { + const match = findTextSegmentsInOrder(finalText, streamingSegments); + if (!match.matched) { + return null; + } + unstreamedSuffix = finalText.slice(match.lastMatchEnd); + } + + const lastDeltaIndex = + currentTurnStreamingDeltaIndexes[ + currentTurnStreamingDeltaIndexes.length - 1 + ]; + const lastDelta = nextUiEvents[lastDeltaIndex]; + if (unstreamedSuffix && isStreamingDeltaEvent(lastDelta)) { + nextUiEvents[lastDeltaIndex] = appendContentToStreamingDeltaEvent( + lastDelta, + unstreamedSuffix, + ); + } + + return nextUiEvents; +}; /** * Handles adding an event to the UI events array @@ -19,6 +146,35 @@ export const handleEventForUI = ( ): OpenHandsEvent[] => { const newUiEvents = [...uiEvents]; + if (isStreamingDeltaEvent(event)) { + if (event.content === null && event.reasoning_content === null) { + return newUiEvents; + } + + const lastIndex = newUiEvents.length - 1; + const lastEvent = newUiEvents[lastIndex]; + if (lastEvent && isStreamingDeltaEvent(lastEvent)) { + newUiEvents[lastIndex] = mergeStreamingDeltaEvent(event, lastEvent); + return newUiEvents; + } + + newUiEvents.push(event); + return newUiEvents; + } + + if ( + (isActionEvent(event) && event.action.kind === "FinishAction") || + (isMessageEvent(event) && event.source === "agent") + ) { + const finalizedUiEvents = finalizeStreamingDeltasInPlace( + event, + newUiEvents, + ); + if (finalizedUiEvents) { + return finalizedUiEvents; + } + } + if (isACPToolCallEvent(event)) { const existingIndex = newUiEvents.findIndex( (uiEvent) => diff --git a/tests/e2e/snapshots/settings-page.snapshot.spec.ts b/tests/e2e/snapshots/settings-page.snapshot.spec.ts index 8190066e2..3197b392b 100644 --- a/tests/e2e/snapshots/settings-page.snapshot.spec.ts +++ b/tests/e2e/snapshots/settings-page.snapshot.spec.ts @@ -92,6 +92,14 @@ async function setupMocks(page: Page, showConsentModal = false) { }); }); + await page.route("**/api/workspaces**", async (route) => { + await route.fulfill({ + status: 200, + contentType: "application/json", + body: JSON.stringify({ workspaces: [], workspaceParents: [] }), + }); + }); + await page.route("**/api/bash/execute_bash_command", async (route) => { await route.fulfill({ status: 200, @@ -161,7 +169,7 @@ test.describe("UI Visual Snapshots", () => { await dismissConsentModal(page); const homeScreen = page.getByTestId("home-screen"); - await expect(homeScreen).toBeVisible(); + await expect(homeScreen).toBeVisible({ timeout: 15_000 }); await page.waitForLoadState("networkidle"); const rootLayout = page.getByTestId("root-layout");