diff --git a/example/convex/chat.ts b/example/convex/chat.ts
index 190c501..f81650a 100644
--- a/example/convex/chat.ts
+++ b/example/convex/chat.ts
@@ -21,25 +21,50 @@ export const streamChat = httpAction(async (ctx, request) => {
// Lets grab the history up to now so that the AI has some context
const history = await ctx.runQuery(internal.messages.getHistory);
- // Lets kickoff a stream request to OpenAI
- const stream = await openai.chat.completions.create({
- model: "gpt-4.1-mini",
- messages: [
+ // o4-mini works best with the Responses API for reasoning
+ const response = await openai.responses.create({
+ model: "o4-mini",
+ input: [
{
role: "system",
content: `You are a helpful assistant that can answer questions and help with tasks.
Please provide your response in markdown format.
-
+
You are continuing a conversation. The conversation so far is found in the following JSON-formatted value:`,
},
...history,
],
+ reasoning: {
+ effort: "medium",
+ summary: "auto", // Get reasoning summary
+ },
stream: true,
});
- // Append each chunk to the persistent stream as they come in from openai
- for await (const part of stream)
- await append(part.choices[0]?.delta?.content || "");
+ let currentReasoning = "";
+ let currentText = "";
+
+ // Process the streaming response
+ for await (const event of response) {
+
+ // Handle reasoning summary chunks
+ if (event.type === "response.reasoning_summary_text.delta") {
+ currentReasoning += event.delta || "";
+ await append({
+ text: "",
+ reasoning: event.delta || "",
+ });
+ }
+
+ // Handle output text chunks
+ if (event.type === "response.output_text.delta") {
+ currentText += event.delta || "";
+ await append({
+ text: event.delta || "",
+ reasoning: "",
+ });
+ }
+ }
},
);
diff --git a/example/src/components/ServerMessage.tsx b/example/src/components/ServerMessage.tsx
index 3518238..55ad770 100644
--- a/example/src/components/ServerMessage.tsx
+++ b/example/src/components/ServerMessage.tsx
@@ -17,7 +17,7 @@ export function ServerMessage({
stopStreaming: () => void;
scrollToBottom: () => void;
}) {
- const { text, status } = useStream(
+ const { text, reasoning, status } = useStream(
api.streaming.getStreamBody,
new URL(`${getConvexSiteUrl()}/chat-stream`),
isDriven,
@@ -42,7 +42,15 @@ export function ServerMessage({
return (
-
{text || "Thinking..."}
+ {reasoning && reasoning.length > 0 && (
+
+
Reasoning:
+
+ {reasoning}
+
+
+ )}
+
{text}
{status === "error" && (
Error loading response
)}
diff --git a/src/client/index.ts b/src/client/index.ts
index 2140955..1f987ba 100644
--- a/src/client/index.ts
+++ b/src/client/index.ts
@@ -14,10 +14,12 @@ export type StreamId = string & { __isStreamId: true };
export const StreamIdValidator = v.string();
export type StreamBody = {
text: string;
+ reasoning: string;
status: StreamStatus;
};
-export type ChunkAppender = (text: string) => Promise
;
+export type StreamChunk = string | { text: string; reasoning?: string };
+export type ChunkAppender = (chunk: StreamChunk) => Promise;
export type StreamWriter> = (
ctx: A,
request: Request,
@@ -77,11 +79,15 @@ export class PersistentTextStreaming {
ctx: RunQueryCtx,
streamId: StreamId,
): Promise {
- const { text, status } = await ctx.runQuery(
+ const { text, reasoning, status } = await ctx.runQuery(
this.component.lib.getStreamText,
{ streamId },
);
- return { text, status: status as StreamStatus };
+ return {
+ text,
+ reasoning: reasoning ?? "",
+ status: status as StreamStatus
+ };
}
/**
@@ -125,14 +131,19 @@ export class PersistentTextStreaming {
let writer =
writable.getWriter() as WritableStreamDefaultWriter | null;
const textEncoder = new TextEncoder();
- let pending = "";
+ let pending = { text: "", reasoning: "" };
const doStream = async () => {
- const chunkAppender: ChunkAppender = async (text) => {
+ const chunkAppender: ChunkAppender = async (chunk) => {
+ // Normalize input to object form
+ const normalized = typeof chunk === "string" ? { text: chunk } : chunk;
+
// write to this handler's response stream on every update
if (writer) {
try {
- await writer.write(textEncoder.encode(text));
+ await writer.write(textEncoder.encode(
+ JSON.stringify(normalized) + "\n"
+ ));
} catch (e) {
console.error("Error writing to stream", e);
console.error(
@@ -141,11 +152,12 @@ export class PersistentTextStreaming {
writer = null;
}
}
- pending += text;
+ pending.text += normalized.text;
+ pending.reasoning += normalized.reasoning || "";
// write to the database periodically, like at the end of sentences
- if (hasDelimeter(text)) {
- await this.addChunk(ctx, streamId, pending, false);
- pending = "";
+ if (hasDelimeter(normalized.text)) {
+ await this.addChunk(ctx, streamId, pending.text, pending.reasoning, false);
+ pending = { text: "", reasoning: "" };
}
};
try {
@@ -159,7 +171,7 @@ export class PersistentTextStreaming {
}
// Success? Flush any last updates
- await this.addChunk(ctx, streamId, pending, true);
+ await this.addChunk(ctx, streamId, pending.text, pending.reasoning, true);
if (writer) {
await writer.close();
@@ -178,11 +190,13 @@ export class PersistentTextStreaming {
ctx: RunMutationCtx,
streamId: StreamId,
text: string,
+ reasoning: string,
final: boolean,
) {
await ctx.runMutation(this.component.lib.addChunk, {
streamId,
text,
+ reasoning: reasoning || undefined,
final,
});
}
diff --git a/src/component/_generated/component.ts b/src/component/_generated/component.ts
index 6053923..5c2db6d 100644
--- a/src/component/_generated/component.ts
+++ b/src/component/_generated/component.ts
@@ -27,7 +27,7 @@ export type ComponentApi =
addChunk: FunctionReference<
"mutation",
"internal",
- { final: boolean; streamId: string; text: string },
+ { final: boolean; reasoning?: string; streamId: string; text: string },
any,
Name
>;
@@ -44,6 +44,7 @@ export type ComponentApi =
"internal",
{ streamId: string },
{
+ reasoning: string;
status: "pending" | "streaming" | "done" | "error" | "timeout";
text: string;
},
diff --git a/src/component/lib.ts b/src/component/lib.ts
index cc01906..a781f6f 100644
--- a/src/component/lib.ts
+++ b/src/component/lib.ts
@@ -20,6 +20,7 @@ export const addChunk = mutation({
args: {
streamId: v.id("streams"),
text: v.string(),
+ reasoning: v.optional(v.string()),
final: v.boolean(),
},
handler: async (ctx, args) => {
@@ -37,6 +38,7 @@ export const addChunk = mutation({
await ctx.db.insert("chunks", {
streamId: args.streamId,
text: args.text,
+ reasoning: args.reasoning,
});
if (args.final) {
await ctx.db.patch(args.streamId, {
@@ -97,6 +99,7 @@ export const getStreamText = query({
},
returns: v.object({
text: v.string(),
+ reasoning: v.string(),
status: streamStatusValidator,
}),
handler: async (ctx, args) => {
@@ -105,15 +108,18 @@ export const getStreamText = query({
throw new Error("Stream not found");
}
let text = "";
+ let reasoning = "";
if (stream.status !== "pending") {
const chunks = await ctx.db
.query("chunks")
.withIndex("byStream", (q) => q.eq("streamId", args.streamId))
.collect();
text = chunks.map((chunk) => chunk.text).join("");
+ reasoning = chunks.map((chunk) => chunk.reasoning || "").join("");
}
return {
text,
+ reasoning,
status: stream.status,
};
},
diff --git a/src/component/schema.ts b/src/component/schema.ts
index d7435fc..99c1d47 100644
--- a/src/component/schema.ts
+++ b/src/component/schema.ts
@@ -17,5 +17,6 @@ export default defineSchema({
chunks: defineTable({
streamId: v.id("streams"),
text: v.string(),
+ reasoning: v.optional(v.string()),
}).index("byStream", ["streamId"]),
});
diff --git a/src/react/index.ts b/src/react/index.ts
index ef82b0e..5dafcb0 100644
--- a/src/react/index.ts
+++ b/src/react/index.ts
@@ -63,7 +63,7 @@ export function useStream(
getPersistentBody,
usePersistence && streamId ? { streamId } : "skip",
);
- const [streamBody, setStreamBody] = useState("");
+ const [streamBody, setStreamBody] = useState({ text: "", reasoning: "" });
useEffect(() => {
if (driven && streamId && !streamStarted.current) {
@@ -72,8 +72,11 @@ export function useStream(
const success = await startStreaming(
streamUrl,
streamId,
- (text) => {
- setStreamBody((prev) => prev + text);
+ (chunk) => {
+ setStreamBody((prev) => ({
+ text: prev.text + chunk.text,
+ reasoning: prev.reasoning + (chunk.reasoning || ""),
+ }));
},
{
...opts?.headers,
@@ -110,12 +113,13 @@ export function useStream(
}
let status: StreamStatus;
if (streamEnded === null) {
- status = streamBody.length > 0 ? "streaming" : "pending";
+ status = streamBody.text.length > 0 ? "streaming" : "pending";
} else {
status = streamEnded ? "done" : "error";
}
return {
- text: streamBody,
+ text: streamBody.text,
+ reasoning: streamBody.reasoning,
status: status as StreamStatus,
};
}, [persistentBody, streamBody, streamEnded]);
@@ -139,7 +143,7 @@ export function useStream(
async function startStreaming(
url: URL,
streamId: StreamId,
- onUpdate: (text: string) => void,
+ onUpdate: (chunk: { text: string; reasoning?: string }) => void,
headers: Record,
) {
const response = await fetch(url, {
@@ -162,18 +166,74 @@ async function startStreaming(
console.error("No body in response", response);
return false;
}
- const reader = response.body.getReader();
- while (true) {
- try {
- const { done, value } = await reader.read();
- if (done) {
- onUpdate(new TextDecoder().decode(value));
- return true;
+
+ const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
+
+ // Read first chunk to detect format
+ const firstRead = await reader.read();
+ if (firstRead.done) {
+ return true;
+ }
+
+ const firstChunk = firstRead.value;
+ const isJsonMode = firstChunk.trimStart().startsWith("{");
+
+ if (isJsonMode) {
+ // JSON mode: split by newlines and parse each line
+ let buffer = firstChunk;
+
+ const processBuffer = () => {
+ const lines = buffer.split("\n");
+ buffer = lines.pop() || "";
+ for (const line of lines) {
+ if (line.trim()) {
+ try {
+ onUpdate(JSON.parse(line));
+ } catch {
+ onUpdate({ text: line });
+ }
+ }
+ }
+ };
+
+ processBuffer();
+
+ while (true) {
+ try {
+ const { done, value } = await reader.read();
+ if (done) {
+ // Flush remaining buffer
+ if (buffer.trim()) {
+ try {
+ onUpdate(JSON.parse(buffer));
+ } catch {
+ onUpdate({ text: buffer });
+ }
+ }
+ return true;
+ }
+ buffer += value;
+ processBuffer();
+ } catch (e) {
+ console.error("Error reading stream", e);
+ return false;
+ }
+ }
+ } else {
+ // Plain text mode: pass chunks through directly
+ onUpdate({ text: firstChunk });
+
+ while (true) {
+ try {
+ const { done, value } = await reader.read();
+ if (done) {
+ return true;
+ }
+ onUpdate({ text: value });
+ } catch (e) {
+ console.error("Error reading stream", e);
+ return false;
}
- onUpdate(new TextDecoder().decode(value));
- } catch (e) {
- console.error("Error reading stream", e);
- return false;
}
}
}