From 383b73ea7c7f52ce6e39a595bf91afad27f1e3fd Mon Sep 17 00:00:00 2001 From: Daniel Sticker Date: Tue, 18 Nov 2025 12:28:46 +0100 Subject: [PATCH 1/3] Add optional reasoning field to streams, update example to use OpenAI o4-mini, and display reasoning summaries in the UI above the main response in example --- example/convex/chat.ts | 41 +++++++++++++++---- example/src/components/ServerMessage.tsx | 12 +++++- src/client/index.ts | 36 +++++++++++------ src/component/_generated/component.ts | 3 +- src/component/lib.ts | 6 +++ src/component/schema.ts | 1 + src/react/index.ts | 50 +++++++++++++++++++----- 7 files changed, 117 insertions(+), 32 deletions(-) diff --git a/example/convex/chat.ts b/example/convex/chat.ts index 190c501..1eb1af9 100644 --- a/example/convex/chat.ts +++ b/example/convex/chat.ts @@ -21,25 +21,50 @@ export const streamChat = httpAction(async (ctx, request) => { // Lets grab the history up to now so that the AI has some context const history = await ctx.runQuery(internal.messages.getHistory); - // Lets kickoff a stream request to OpenAI - const stream = await openai.chat.completions.create({ - model: "gpt-4.1-mini", - messages: [ + // o4-mini works best with the Responses API for reasoning + const response = await (openai as any).responses.create({ + model: "o4-mini", + input: [ { role: "system", content: `You are a helpful assistant that can answer questions and help with tasks. Please provide your response in markdown format. - + You are continuing a conversation. The conversation so far is found in the following JSON-formatted value:`, }, ...history, ], + reasoning: { + effort: "medium", + summary: "auto", // Get reasoning summary + }, stream: true, }); - // Append each chunk to the persistent stream as they come in from openai - for await (const part of stream) - await append(part.choices[0]?.delta?.content || ""); + let currentReasoning = ""; + let currentText = ""; + + // Process the streaming response + for await (const event of response) { + + // Handle reasoning summary chunks + if (event.type === "response.reasoning_summary_text.delta") { + currentReasoning += event.delta || ""; + await append({ + text: "", + reasoning: event.delta || "", + }); + } + + // Handle output text chunks + if (event.type === "response.output_text.delta") { + currentText += event.delta || ""; + await append({ + text: event.delta || "", + reasoning: "", + }); + } + } }, ); diff --git a/example/src/components/ServerMessage.tsx b/example/src/components/ServerMessage.tsx index 3518238..55ad770 100644 --- a/example/src/components/ServerMessage.tsx +++ b/example/src/components/ServerMessage.tsx @@ -17,7 +17,7 @@ export function ServerMessage({ stopStreaming: () => void; scrollToBottom: () => void; }) { - const { text, status } = useStream( + const { text, reasoning, status } = useStream( api.streaming.getStreamBody, new URL(`${getConvexSiteUrl()}/chat-stream`), isDriven, @@ -42,7 +42,15 @@ export function ServerMessage({ return (
- {text || "Thinking..."} + {reasoning && reasoning.length > 0 && ( +
+
Reasoning:
+
+ {reasoning} +
+
+ )} + {text} {status === "error" && (
Error loading response
)} diff --git a/src/client/index.ts b/src/client/index.ts index 2140955..1f987ba 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -14,10 +14,12 @@ export type StreamId = string & { __isStreamId: true }; export const StreamIdValidator = v.string(); export type StreamBody = { text: string; + reasoning: string; status: StreamStatus; }; -export type ChunkAppender = (text: string) => Promise; +export type StreamChunk = string | { text: string; reasoning?: string }; +export type ChunkAppender = (chunk: StreamChunk) => Promise; export type StreamWriter> = ( ctx: A, request: Request, @@ -77,11 +79,15 @@ export class PersistentTextStreaming { ctx: RunQueryCtx, streamId: StreamId, ): Promise { - const { text, status } = await ctx.runQuery( + const { text, reasoning, status } = await ctx.runQuery( this.component.lib.getStreamText, { streamId }, ); - return { text, status: status as StreamStatus }; + return { + text, + reasoning: reasoning ?? "", + status: status as StreamStatus + }; } /** @@ -125,14 +131,19 @@ export class PersistentTextStreaming { let writer = writable.getWriter() as WritableStreamDefaultWriter | null; const textEncoder = new TextEncoder(); - let pending = ""; + let pending = { text: "", reasoning: "" }; const doStream = async () => { - const chunkAppender: ChunkAppender = async (text) => { + const chunkAppender: ChunkAppender = async (chunk) => { + // Normalize input to object form + const normalized = typeof chunk === "string" ? { text: chunk } : chunk; + // write to this handler's response stream on every update if (writer) { try { - await writer.write(textEncoder.encode(text)); + await writer.write(textEncoder.encode( + JSON.stringify(normalized) + "\n" + )); } catch (e) { console.error("Error writing to stream", e); console.error( @@ -141,11 +152,12 @@ export class PersistentTextStreaming { writer = null; } } - pending += text; + pending.text += normalized.text; + pending.reasoning += normalized.reasoning || ""; // write to the database periodically, like at the end of sentences - if (hasDelimeter(text)) { - await this.addChunk(ctx, streamId, pending, false); - pending = ""; + if (hasDelimeter(normalized.text)) { + await this.addChunk(ctx, streamId, pending.text, pending.reasoning, false); + pending = { text: "", reasoning: "" }; } }; try { @@ -159,7 +171,7 @@ export class PersistentTextStreaming { } // Success? Flush any last updates - await this.addChunk(ctx, streamId, pending, true); + await this.addChunk(ctx, streamId, pending.text, pending.reasoning, true); if (writer) { await writer.close(); @@ -178,11 +190,13 @@ export class PersistentTextStreaming { ctx: RunMutationCtx, streamId: StreamId, text: string, + reasoning: string, final: boolean, ) { await ctx.runMutation(this.component.lib.addChunk, { streamId, text, + reasoning: reasoning || undefined, final, }); } diff --git a/src/component/_generated/component.ts b/src/component/_generated/component.ts index 6053923..5c2db6d 100644 --- a/src/component/_generated/component.ts +++ b/src/component/_generated/component.ts @@ -27,7 +27,7 @@ export type ComponentApi = addChunk: FunctionReference< "mutation", "internal", - { final: boolean; streamId: string; text: string }, + { final: boolean; reasoning?: string; streamId: string; text: string }, any, Name >; @@ -44,6 +44,7 @@ export type ComponentApi = "internal", { streamId: string }, { + reasoning: string; status: "pending" | "streaming" | "done" | "error" | "timeout"; text: string; }, diff --git a/src/component/lib.ts b/src/component/lib.ts index cc01906..a781f6f 100644 --- a/src/component/lib.ts +++ b/src/component/lib.ts @@ -20,6 +20,7 @@ export const addChunk = mutation({ args: { streamId: v.id("streams"), text: v.string(), + reasoning: v.optional(v.string()), final: v.boolean(), }, handler: async (ctx, args) => { @@ -37,6 +38,7 @@ export const addChunk = mutation({ await ctx.db.insert("chunks", { streamId: args.streamId, text: args.text, + reasoning: args.reasoning, }); if (args.final) { await ctx.db.patch(args.streamId, { @@ -97,6 +99,7 @@ export const getStreamText = query({ }, returns: v.object({ text: v.string(), + reasoning: v.string(), status: streamStatusValidator, }), handler: async (ctx, args) => { @@ -105,15 +108,18 @@ export const getStreamText = query({ throw new Error("Stream not found"); } let text = ""; + let reasoning = ""; if (stream.status !== "pending") { const chunks = await ctx.db .query("chunks") .withIndex("byStream", (q) => q.eq("streamId", args.streamId)) .collect(); text = chunks.map((chunk) => chunk.text).join(""); + reasoning = chunks.map((chunk) => chunk.reasoning || "").join(""); } return { text, + reasoning, status: stream.status, }; }, diff --git a/src/component/schema.ts b/src/component/schema.ts index d7435fc..99c1d47 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -17,5 +17,6 @@ export default defineSchema({ chunks: defineTable({ streamId: v.id("streams"), text: v.string(), + reasoning: v.optional(v.string()), }).index("byStream", ["streamId"]), }); diff --git a/src/react/index.ts b/src/react/index.ts index ef82b0e..44a71ea 100644 --- a/src/react/index.ts +++ b/src/react/index.ts @@ -63,7 +63,7 @@ export function useStream( getPersistentBody, usePersistence && streamId ? { streamId } : "skip", ); - const [streamBody, setStreamBody] = useState(""); + const [streamBody, setStreamBody] = useState({ text: "", reasoning: "" }); useEffect(() => { if (driven && streamId && !streamStarted.current) { @@ -72,8 +72,11 @@ export function useStream( const success = await startStreaming( streamUrl, streamId, - (text) => { - setStreamBody((prev) => prev + text); + (chunk) => { + setStreamBody((prev) => ({ + text: prev.text + chunk.text, + reasoning: prev.reasoning + (chunk.reasoning || ""), + })); }, { ...opts?.headers, @@ -110,12 +113,13 @@ export function useStream( } let status: StreamStatus; if (streamEnded === null) { - status = streamBody.length > 0 ? "streaming" : "pending"; + status = streamBody.text.length > 0 ? "streaming" : "pending"; } else { status = streamEnded ? "done" : "error"; } return { - text: streamBody, + text: streamBody.text, + reasoning: streamBody.reasoning, status: status as StreamStatus, }; }, [persistentBody, streamBody, streamEnded]); @@ -139,7 +143,7 @@ export function useStream( async function startStreaming( url: URL, streamId: StreamId, - onUpdate: (text: string) => void, + onUpdate: (chunk: { text: string; reasoning?: string }) => void, headers: Record, ) { const response = await fetch(url, { @@ -162,15 +166,41 @@ async function startStreaming( console.error("No body in response", response); return false; } - const reader = response.body.getReader(); + + let buffer = ''; + const lines = response.body + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new TransformStream({ + transform(chunk, controller) { + buffer += chunk; + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + for (const line of lines) { + if (line.trim()) { + controller.enqueue(line); + } + } + }, + flush(controller) { + if (buffer.trim()) { + controller.enqueue(buffer); + } + } + })) + .getReader(); + while (true) { try { - const { done, value } = await reader.read(); + const { done, value } = await lines.read(); if (done) { - onUpdate(new TextDecoder().decode(value)); return true; } - onUpdate(new TextDecoder().decode(value)); + try { + const chunk = JSON.parse(value); + onUpdate(chunk); + } catch (e) { + console.error("Error parsing chunk", e, value); + } } catch (e) { console.error("Error reading stream", e); return false; From 69d8c42fa1ef2b1d782127e91f376f46c6d4ae61 Mon Sep 17 00:00:00 2001 From: Daniel Sticker Date: Sun, 7 Dec 2025 20:16:04 +0100 Subject: [PATCH 2/3] refactor: remove unnecessary open ai type casting --- example/convex/chat.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/convex/chat.ts b/example/convex/chat.ts index 1eb1af9..f81650a 100644 --- a/example/convex/chat.ts +++ b/example/convex/chat.ts @@ -22,7 +22,7 @@ export const streamChat = httpAction(async (ctx, request) => { const history = await ctx.runQuery(internal.messages.getHistory); // o4-mini works best with the Responses API for reasoning - const response = await (openai as any).responses.create({ + const response = await openai.responses.create({ model: "o4-mini", input: [ { From d0e1a4daa3ddbf28063af5971855653ab6a5d9c9 Mon Sep 17 00:00:00 2001 From: Daniel Sticker Date: Sun, 7 Dec 2025 20:23:25 +0100 Subject: [PATCH 3/3] feat: better streaming response handling (JSON detection) --- src/react/index.ts | 92 ++++++++++++++++++++++++++++++---------------- 1 file changed, 61 insertions(+), 31 deletions(-) diff --git a/src/react/index.ts b/src/react/index.ts index 44a71ea..5dafcb0 100644 --- a/src/react/index.ts +++ b/src/react/index.ts @@ -166,44 +166,74 @@ async function startStreaming( console.error("No body in response", response); return false; } - - let buffer = ''; - const lines = response.body - .pipeThrough(new TextDecoderStream()) - .pipeThrough(new TransformStream({ - transform(chunk, controller) { - buffer += chunk; - const lines = buffer.split('\n'); - buffer = lines.pop() || ''; - for (const line of lines) { - if (line.trim()) { - controller.enqueue(line); + + const reader = response.body.pipeThrough(new TextDecoderStream()).getReader(); + + // Read first chunk to detect format + const firstRead = await reader.read(); + if (firstRead.done) { + return true; + } + + const firstChunk = firstRead.value; + const isJsonMode = firstChunk.trimStart().startsWith("{"); + + if (isJsonMode) { + // JSON mode: split by newlines and parse each line + let buffer = firstChunk; + + const processBuffer = () => { + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; + for (const line of lines) { + if (line.trim()) { + try { + onUpdate(JSON.parse(line)); + } catch { + onUpdate({ text: line }); } } - }, - flush(controller) { - if (buffer.trim()) { - controller.enqueue(buffer); - } } - })) - .getReader(); - - while (true) { - try { - const { done, value } = await lines.read(); - if (done) { - return true; + }; + + processBuffer(); + + while (true) { + try { + const { done, value } = await reader.read(); + if (done) { + // Flush remaining buffer + if (buffer.trim()) { + try { + onUpdate(JSON.parse(buffer)); + } catch { + onUpdate({ text: buffer }); + } + } + return true; + } + buffer += value; + processBuffer(); + } catch (e) { + console.error("Error reading stream", e); + return false; } + } + } else { + // Plain text mode: pass chunks through directly + onUpdate({ text: firstChunk }); + + while (true) { try { - const chunk = JSON.parse(value); - onUpdate(chunk); + const { done, value } = await reader.read(); + if (done) { + return true; + } + onUpdate({ text: value }); } catch (e) { - console.error("Error parsing chunk", e, value); + console.error("Error reading stream", e); + return false; } - } catch (e) { - console.error("Error reading stream", e); - return false; } } }