Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions example/convex/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,50 @@
// Lets grab the history up to now so that the AI has some context
const history = await ctx.runQuery(internal.messages.getHistory);

// Lets kickoff a stream request to OpenAI
const stream = await openai.chat.completions.create({
model: "gpt-4.1-mini",
messages: [
// o4-mini works best with the Responses API for reasoning
const response = await openai.responses.create({
model: "o4-mini",
input: [
{
role: "system",
content: `You are a helpful assistant that can answer questions and help with tasks.
Please provide your response in markdown format.

You are continuing a conversation. The conversation so far is found in the following JSON-formatted value:`,
},
...history,
],
Comment on lines +27 to 36
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter input is used instead of messages. The OpenAI Chat Completions API uses messages as the parameter name for the conversation history. If this is intended to use a different/future API, please document which version of the OpenAI SDK supports this interface.

Copilot uses AI. Check for mistakes.
reasoning: {
effort: "medium",
summary: "auto", // Get reasoning summary
},
stream: true,
});

// Append each chunk to the persistent stream as they come in from openai
for await (const part of stream)
await append(part.choices[0]?.delta?.content || "");
let currentReasoning = "";
let currentText = "";
Comment on lines +44 to +45
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Remove unused variables.

currentReasoning and currentText are accumulated but never read. They appear to be leftover from an earlier implementation.

-      let currentReasoning = "";
-      let currentText = "";
-
       // Process the streaming response
       for await (const event of response) {
🤖 Prompt for AI Agents
In example/convex/chat.ts around lines 44-45, the variables currentReasoning and
currentText are declared and accumulated but never read; remove these unused
variables and any related accumulation code (appends/concatenations) so only
live variables remain—delete the declarations and search for any assignments or
+= operations tied to them and remove those lines as well.

Comment on lines +44 to +45
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These variables currentReasoning and currentText are declared but never used. They accumulate values from the event stream but are not referenced anywhere else in the code. Consider removing them if they're not needed, or use them if they serve a purpose (e.g., for logging or validation).

Copilot uses AI. Check for mistakes.

// Process the streaming response
for await (const event of response) {

// Handle reasoning summary chunks
if (event.type === "response.reasoning_summary_text.delta") {
currentReasoning += event.delta || "";

Check warning on line 52 in example/convex/chat.ts

View workflow job for this annotation

GitHub Actions / Test and lint

'currentReasoning' is assigned a value but never used. Allowed unused vars must match /^_/u
await append({
text: "",
reasoning: event.delta || "",
});
}

// Handle output text chunks
if (event.type === "response.output_text.delta") {
currentText += event.delta || "";

Check warning on line 61 in example/convex/chat.ts

View workflow job for this annotation

GitHub Actions / Test and lint

'currentText' is assigned a value but never used. Allowed unused vars must match /^_/u
await append({
text: event.delta || "",
reasoning: "",
});
}
Comment on lines +60 to +66
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event processing loop doesn't handle other event types that might be emitted by the streaming API. If the API emits events other than response.reasoning_summary_text.delta and response.output_text.delta (such as error events, completion events, or other metadata), they will be silently ignored. Consider adding a default case to handle or log unexpected event types for debugging purposes.

Suggested change
if (event.type === "response.output_text.delta") {
currentText += event.delta || "";
await append({
text: event.delta || "",
reasoning: "",
});
}
else if (event.type === "response.output_text.delta") {
currentText += event.delta || "";
await append({
text: event.delta || "",
reasoning: "",
});
}
// Handle unexpected event types
else {
console.warn(
`Unhandled event type in streaming response: ${event.type}`,
event
);
}

Copilot uses AI. Check for mistakes.
}
},
);

Expand Down
12 changes: 10 additions & 2 deletions example/src/components/ServerMessage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export function ServerMessage({
stopStreaming: () => void;
scrollToBottom: () => void;
}) {
const { text, status } = useStream(
const { text, reasoning, status } = useStream(
api.streaming.getStreamBody,
new URL(`${getConvexSiteUrl()}/chat-stream`),
isDriven,
Expand All @@ -42,7 +42,15 @@ export function ServerMessage({

return (
<div className="md-answer">
<Markdown>{text || "Thinking..."}</Markdown>
{reasoning && reasoning.length > 0 && (
<div className="mb-3 pb-3 border-b border-gray-200">
<div className="text-xs text-gray-500 mb-2">Reasoning:</div>
<div className="text-sm text-gray-600">
<Markdown>{reasoning}</Markdown>
</div>
</div>
)}
<Markdown>{text}</Markdown>
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback text "Thinking..." has been removed, but no replacement is provided when text is empty. This means that during the initial "pending" state before any text has streamed, the UI will show an empty <Markdown> component. Consider adding back a loading indicator or placeholder text to provide feedback to users while the stream is starting.

Suggested change
<Markdown>{text}</Markdown>
{isCurrentlyStreaming && !text ? (
<div className="text-gray-400 italic">Thinking...</div>
) : (
<Markdown>{text}</Markdown>
)}

Copilot uses AI. Check for mistakes.
{status === "error" && (
<div className="text-red-500 mt-2">Error loading response</div>
)}
Expand Down
36 changes: 25 additions & 11 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ export type StreamId = string & { __isStreamId: true };
export const StreamIdValidator = v.string();
export type StreamBody = {
text: string;
reasoning: string;
status: StreamStatus;
};

Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a JSDoc comment for the new StreamChunk type to document that it supports both legacy string format and the new object format with optional reasoning field. This would help developers understand the API evolution and migration path.

Suggested change
/**
* Represents a chunk of streamed data.
*
* Supports both the legacy string format and the new object format.
* - Legacy format: `string` (e.g., "Hello world")
* - Object format: `{ text: string; reasoning?: string }`
*
* The object format allows for optional reasoning metadata to be attached to each chunk.
* This dual format supports API evolution: existing code using strings will continue to work,
* while new code can migrate to the richer object format as needed.
*/

Copilot uses AI. Check for mistakes.
export type ChunkAppender = (text: string) => Promise<void>;
export type StreamChunk = string | { text: string; reasoning?: string };
export type ChunkAppender = (chunk: StreamChunk) => Promise<void>;
export type StreamWriter<A extends GenericActionCtx<GenericDataModel>> = (
ctx: A,
request: Request,
Expand Down Expand Up @@ -77,11 +79,15 @@ export class PersistentTextStreaming {
ctx: RunQueryCtx,
streamId: StreamId,
): Promise<StreamBody> {
const { text, status } = await ctx.runQuery(
const { text, reasoning, status } = await ctx.runQuery(
this.component.lib.getStreamText,
{ streamId },
);
return { text, status: status as StreamStatus };
return {
text,
reasoning: reasoning ?? "",
status: status as StreamStatus
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Missing trailing comma. The other properties in this object have trailing commas (lines 87-88), so line 89 should also have one for consistency with the codebase style.

Suggested change
status: status as StreamStatus
status: status as StreamStatus,

Copilot uses AI. Check for mistakes.
};
}

/**
Expand Down Expand Up @@ -125,14 +131,19 @@ export class PersistentTextStreaming {
let writer =
writable.getWriter() as WritableStreamDefaultWriter<Uint8Array> | null;
const textEncoder = new TextEncoder();
let pending = "";
let pending = { text: "", reasoning: "" };

const doStream = async () => {
const chunkAppender: ChunkAppender = async (text) => {
const chunkAppender: ChunkAppender = async (chunk) => {
// Normalize input to object form
const normalized = typeof chunk === "string" ? { text: chunk } : chunk;

// write to this handler's response stream on every update
if (writer) {
try {
await writer.write(textEncoder.encode(text));
await writer.write(textEncoder.encode(
JSON.stringify(normalized) + "\n"
));
} catch (e) {
console.error("Error writing to stream", e);
console.error(
Expand All @@ -141,11 +152,12 @@ export class PersistentTextStreaming {
writer = null;
}
}
pending += text;
pending.text += normalized.text;
pending.reasoning += normalized.reasoning || "";
// write to the database periodically, like at the end of sentences
if (hasDelimeter(text)) {
await this.addChunk(ctx, streamId, pending, false);
pending = "";
if (hasDelimeter(normalized.text)) {
await this.addChunk(ctx, streamId, pending.text, pending.reasoning, false);
pending = { text: "", reasoning: "" };
}
};
try {
Expand All @@ -159,7 +171,7 @@ export class PersistentTextStreaming {
}

// Success? Flush any last updates
await this.addChunk(ctx, streamId, pending, true);
await this.addChunk(ctx, streamId, pending.text, pending.reasoning, true);

if (writer) {
await writer.close();
Expand All @@ -178,11 +190,13 @@ export class PersistentTextStreaming {
ctx: RunMutationCtx,
streamId: StreamId,
text: string,
reasoning: string,
final: boolean,
) {
await ctx.runMutation(this.component.lib.addChunk, {
streamId,
text,
reasoning: reasoning || undefined,
final,
});
}
Expand Down
3 changes: 2 additions & 1 deletion src/component/_generated/component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export type ComponentApi<Name extends string | undefined = string | undefined> =
addChunk: FunctionReference<
"mutation",
"internal",
{ final: boolean; streamId: string; text: string },
{ final: boolean; reasoning?: string; streamId: string; text: string },
any,
Name
>;
Expand All @@ -44,6 +44,7 @@ export type ComponentApi<Name extends string | undefined = string | undefined> =
"internal",
{ streamId: string },
{
reasoning: string;
status: "pending" | "streaming" | "done" | "error" | "timeout";
text: string;
},
Expand Down
6 changes: 6 additions & 0 deletions src/component/lib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export const addChunk = mutation({
args: {
streamId: v.id("streams"),
text: v.string(),
reasoning: v.optional(v.string()),
final: v.boolean(),
},
handler: async (ctx, args) => {
Expand All @@ -37,6 +38,7 @@ export const addChunk = mutation({
await ctx.db.insert("chunks", {
streamId: args.streamId,
text: args.text,
reasoning: args.reasoning,
});
if (args.final) {
await ctx.db.patch(args.streamId, {
Expand Down Expand Up @@ -97,6 +99,7 @@ export const getStreamText = query({
},
returns: v.object({
text: v.string(),
reasoning: v.string(),
status: streamStatusValidator,
}),
handler: async (ctx, args) => {
Expand All @@ -105,15 +108,18 @@ export const getStreamText = query({
throw new Error("Stream not found");
}
let text = "";
let reasoning = "";
if (stream.status !== "pending") {
const chunks = await ctx.db
.query("chunks")
.withIndex("byStream", (q) => q.eq("streamId", args.streamId))
.collect();
text = chunks.map((chunk) => chunk.text).join("");
reasoning = chunks.map((chunk) => chunk.reasoning || "").join("");
}
return {
text,
reasoning,
status: stream.status,
};
},
Expand Down
1 change: 1 addition & 0 deletions src/component/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ export default defineSchema({
chunks: defineTable({
streamId: v.id("streams"),
text: v.string(),
reasoning: v.optional(v.string()),
}).index("byStream", ["streamId"]),
});
94 changes: 77 additions & 17 deletions src/react/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export function useStream(
getPersistentBody,
usePersistence && streamId ? { streamId } : "skip",
);
const [streamBody, setStreamBody] = useState<string>("");
const [streamBody, setStreamBody] = useState({ text: "", reasoning: "" });

useEffect(() => {
if (driven && streamId && !streamStarted.current) {
Expand All @@ -72,8 +72,11 @@ export function useStream(
const success = await startStreaming(
streamUrl,
streamId,
(text) => {
setStreamBody((prev) => prev + text);
(chunk) => {
setStreamBody((prev) => ({
text: prev.text + chunk.text,
reasoning: prev.reasoning + (chunk.reasoning || ""),
}));
},
{
...opts?.headers,
Expand Down Expand Up @@ -110,12 +113,13 @@ export function useStream(
}
let status: StreamStatus;
if (streamEnded === null) {
status = streamBody.length > 0 ? "streaming" : "pending";
status = streamBody.text.length > 0 ? "streaming" : "pending";
} else {
status = streamEnded ? "done" : "error";
}
return {
text: streamBody,
text: streamBody.text,
reasoning: streamBody.reasoning,
status: status as StreamStatus,
};
}, [persistentBody, streamBody, streamEnded]);
Expand All @@ -139,7 +143,7 @@ export function useStream(
async function startStreaming(
url: URL,
streamId: StreamId,
onUpdate: (text: string) => void,
onUpdate: (chunk: { text: string; reasoning?: string }) => void,
headers: Record<string, string>,
) {
const response = await fetch(url, {
Expand All @@ -162,18 +166,74 @@ async function startStreaming(
console.error("No body in response", response);
return false;
}
const reader = response.body.getReader();
while (true) {
try {
const { done, value } = await reader.read();
if (done) {
onUpdate(new TextDecoder().decode(value));
return true;

const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();

// Read first chunk to detect format
const firstRead = await reader.read();
if (firstRead.done) {
return true;
}

const firstChunk = firstRead.value;
const isJsonMode = firstChunk.trimStart().startsWith("{");
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The format detection logic assumes that JSON mode starts with {, but this check is performed on the trimmed first chunk which may not represent the complete first JSON object. If the first chunk is very small (e.g., just {), or if there's whitespace before the JSON, this detection could be unreliable. Consider a more robust detection method, such as checking for a complete JSON object or using a header/flag to indicate the format.

Copilot uses AI. Check for mistakes.

if (isJsonMode) {
// JSON mode: split by newlines and parse each line
let buffer = firstChunk;

const processBuffer = () => {
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.trim()) {
try {
onUpdate(JSON.parse(line));
} catch {
onUpdate({ text: line });
Comment on lines +192 to +193
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catch block silently swallows JSON parse errors. Consider logging the parse error and the problematic line to aid debugging when malformed JSON is received from the server.

Copilot uses AI. Check for mistakes.
}
}
}
};

processBuffer();

while (true) {
try {
const { done, value } = await reader.read();
if (done) {
// Flush remaining buffer
if (buffer.trim()) {
try {
onUpdate(JSON.parse(buffer));
} catch {
onUpdate({ text: buffer });
Comment on lines +209 to +210
Copy link

Copilot AI Dec 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catch block silently swallows JSON parse errors. Consider logging the parse error and the problematic buffer content to aid debugging when malformed JSON is received from the server.

Copilot uses AI. Check for mistakes.
}
}
return true;
}
buffer += value;
processBuffer();
} catch (e) {
console.error("Error reading stream", e);
return false;
}
}
} else {
// Plain text mode: pass chunks through directly
onUpdate({ text: firstChunk });

while (true) {
try {
const { done, value } = await reader.read();
if (done) {
return true;
}
onUpdate({ text: value });
} catch (e) {
console.error("Error reading stream", e);
return false;
}
onUpdate(new TextDecoder().decode(value));
} catch (e) {
console.error("Error reading stream", e);
return false;
}
}
}