Skip to content

Commit bab78ec

Browse files
authored
fix(agent): chunk coalescing fix (#1417)
1 parent bc1ab8e commit bab78ec

4 files changed

Lines changed: 278 additions & 16 deletions

File tree

packages/agent/src/agent.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ export class Agent {
175175

176176
async cleanup(): Promise<void> {
177177
if (this.sessionLogWriter && this.taskRunId) {
178-
await this.sessionLogWriter.flush(this.taskRunId);
178+
await this.sessionLogWriter.flush(this.taskRunId, { coalesce: true });
179179
}
180180
await this.acpConnection?.cleanup();
181181
}

packages/agent/src/server/agent-server.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,9 @@ export class AgentServer {
523523
// against async log persistence to object storage.
524524
let assistantMessage: string | undefined;
525525
try {
526-
await this.session.logWriter.flush(this.session.payload.run_id);
526+
await this.session.logWriter.flush(this.session.payload.run_id, {
527+
coalesce: true,
528+
});
527529
assistantMessage = this.session.logWriter.getFullAgentResponse(
528530
this.session.payload.run_id,
529531
);
@@ -747,6 +749,9 @@ export class AgentServer {
747749
});
748750

749751
this.logger.info("Session initialized successfully");
752+
this.logger.info(
753+
`Agent version: ${this.config.version ?? packageJson.version}`,
754+
);
750755

751756
// Signal in_progress so the UI can start polling for updates
752757
this.posthogAPI
@@ -1111,7 +1116,9 @@ Important:
11111116
): Promise<void> {
11121117
if (this.session?.payload.run_id === payload.run_id) {
11131118
try {
1114-
await this.session.logWriter.flush(payload.run_id);
1119+
await this.session.logWriter.flush(payload.run_id, {
1120+
coalesce: true,
1121+
});
11151122
} catch (error) {
11161123
this.logger.warn("Failed to flush session logs before completion", {
11171124
taskId: payload.task_id,
@@ -1270,7 +1277,7 @@ Important:
12701277
}
12711278

12721279
try {
1273-
await this.session.logWriter.flush(payload.run_id);
1280+
await this.session.logWriter.flush(payload.run_id, { coalesce: true });
12741281
} catch (error) {
12751282
this.logger.warn("Failed to flush logs before Slack relay", {
12761283
taskId: payload.task_id,
@@ -1473,7 +1480,9 @@ Important:
14731480
}
14741481

14751482
try {
1476-
await this.session.logWriter.flush(this.session.payload.run_id);
1483+
await this.session.logWriter.flush(this.session.payload.run_id, {
1484+
coalesce: true,
1485+
});
14771486
} catch (error) {
14781487
this.logger.error("Failed to flush session logs", error);
14791488
}

packages/agent/src/session-log-writer.test.ts

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,222 @@ describe("SessionLogWriter", () => {
159159
});
160160
});
161161

162+
describe("_doFlush does not prematurely coalesce", () => {
163+
it("does not coalesce buffered chunks during a timed flush", async () => {
164+
const sessionId = "s1";
165+
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });
166+
167+
// Buffer some chunks (no non-chunk event to trigger coalescing)
168+
logWriter.appendRawLine(
169+
sessionId,
170+
makeSessionUpdate("agent_message_chunk", {
171+
content: { type: "text", text: "Hello " },
172+
}),
173+
);
174+
logWriter.appendRawLine(
175+
sessionId,
176+
makeSessionUpdate("agent_message_chunk", {
177+
content: { type: "text", text: "world" },
178+
}),
179+
);
180+
181+
// Flush without any non-chunk event arriving — simulates
182+
// the 500ms debounce timer firing mid-stream
183+
await logWriter.flush(sessionId);
184+
185+
// No entries should have been sent — chunks are still buffered
186+
expect(mockAppendLog).not.toHaveBeenCalled();
187+
188+
// Now a non-chunk event arrives, triggering natural coalescing
189+
logWriter.appendRawLine(
190+
sessionId,
191+
makeSessionUpdate("usage_update", { used: 100 }),
192+
);
193+
194+
await logWriter.flush(sessionId);
195+
196+
expect(mockAppendLog).toHaveBeenCalledTimes(1);
197+
const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2];
198+
expect(entries).toHaveLength(2); // coalesced agent_message + usage_update
199+
const coalesced = entries[0].notification;
200+
expect(coalesced.params?.update).toEqual({
201+
sessionUpdate: "agent_message",
202+
content: { type: "text", text: "Hello world" },
203+
});
204+
});
205+
});
206+
207+
describe("flushAll coalesces on shutdown", () => {
208+
it("coalesces remaining chunks before flushing", async () => {
209+
const sessionId = "s1";
210+
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });
211+
212+
logWriter.appendRawLine(
213+
sessionId,
214+
makeSessionUpdate("agent_message_chunk", {
215+
content: { type: "text", text: "partial response" },
216+
}),
217+
);
218+
219+
await logWriter.flushAll();
220+
221+
expect(mockAppendLog).toHaveBeenCalledTimes(1);
222+
const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2];
223+
expect(entries).toHaveLength(1);
224+
const coalesced = entries[0].notification;
225+
expect(coalesced.params?.update).toEqual({
226+
sessionUpdate: "agent_message",
227+
content: { type: "text", text: "partial response" },
228+
});
229+
});
230+
});
231+
232+
describe("flush with coalesce option", () => {
233+
it("drains chunk buffer when coalesce is true", async () => {
234+
const sessionId = "s1";
235+
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });
236+
237+
logWriter.appendRawLine(
238+
sessionId,
239+
makeSessionUpdate("agent_message_chunk", {
240+
content: { type: "text", text: "complete text" },
241+
}),
242+
);
243+
244+
await logWriter.flush(sessionId, { coalesce: true });
245+
246+
expect(mockAppendLog).toHaveBeenCalledTimes(1);
247+
const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2];
248+
const coalesced = entries[0].notification;
249+
expect(coalesced.params?.update).toEqual({
250+
sessionUpdate: "agent_message",
251+
content: { type: "text", text: "complete text" },
252+
});
253+
});
254+
255+
it("does not coalesce when coalesce is false", async () => {
256+
const sessionId = "s1";
257+
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });
258+
259+
logWriter.appendRawLine(
260+
sessionId,
261+
makeSessionUpdate("agent_message_chunk", {
262+
content: { type: "text", text: "buffered" },
263+
}),
264+
);
265+
266+
await logWriter.flush(sessionId, { coalesce: false });
267+
268+
expect(mockAppendLog).not.toHaveBeenCalled();
269+
});
270+
});
271+
272+
describe("direct agent_message supersedes chunks", () => {
273+
it("discards buffered chunks when a direct agent_message arrives", async () => {
274+
const sessionId = "s1";
275+
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });
276+
277+
// Buffer partial chunks
278+
logWriter.appendRawLine(
279+
sessionId,
280+
makeSessionUpdate("agent_message_chunk", {
281+
content: { type: "text", text: "partial " },
282+
}),
283+
);
284+
logWriter.appendRawLine(
285+
sessionId,
286+
makeSessionUpdate("agent_message_chunk", {
287+
content: { type: "text", text: "text" },
288+
}),
289+
);
290+
291+
// Direct agent_message arrives — authoritative full text
292+
logWriter.appendRawLine(
293+
sessionId,
294+
makeSessionUpdate("agent_message", {
295+
content: { type: "text", text: "complete full response" },
296+
}),
297+
);
298+
299+
await logWriter.flush(sessionId);
300+
301+
expect(mockAppendLog).toHaveBeenCalledTimes(1);
302+
const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2];
303+
// Only the direct agent_message — no coalesced partial entry
304+
expect(entries).toHaveLength(1);
305+
const coalesced = entries[0].notification;
306+
expect(coalesced.params?.update).toEqual({
307+
sessionUpdate: "agent_message",
308+
content: { type: "text", text: "complete full response" },
309+
});
310+
expect(logWriter.getLastAgentMessage(sessionId)).toBe(
311+
"complete full response",
312+
);
313+
});
314+
315+
it("is additive with earlier coalesced text in multi-message turns", async () => {
316+
const sessionId = "s1";
317+
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });
318+
319+
// First assistant message: chunks coalesced by a tool_call event
320+
logWriter.appendRawLine(
321+
sessionId,
322+
makeSessionUpdate("agent_message_chunk", {
323+
content: { type: "text", text: "first message" },
324+
}),
325+
);
326+
logWriter.appendRawLine(
327+
sessionId,
328+
makeSessionUpdate("tool_call", { toolCallId: "tc1" }),
329+
);
330+
// "first message" is now coalesced into currentTurnMessages
331+
332+
// Second assistant message arrives as direct agent_message
333+
// (e.g., after tool result, no active chunk buffer)
334+
logWriter.appendRawLine(
335+
sessionId,
336+
makeSessionUpdate("agent_message", {
337+
content: { type: "text", text: "second message" },
338+
}),
339+
);
340+
341+
const response = logWriter.getFullAgentResponse(sessionId);
342+
// Both messages are preserved — direct message is additive
343+
expect(response).toBe("first message\n\nsecond message");
344+
});
345+
346+
it("persisted log does not contain stale entries when chunks are superseded", async () => {
347+
const sessionId = "s1";
348+
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });
349+
350+
// Chunks buffered, then direct agent_message supersedes before coalescing
351+
logWriter.appendRawLine(
352+
sessionId,
353+
makeSessionUpdate("agent_message_chunk", {
354+
content: { type: "text", text: "partial" },
355+
}),
356+
);
357+
logWriter.appendRawLine(
358+
sessionId,
359+
makeSessionUpdate("agent_message", {
360+
content: { type: "text", text: "complete" },
361+
}),
362+
);
363+
364+
await logWriter.flush(sessionId);
365+
366+
expect(mockAppendLog).toHaveBeenCalledTimes(1);
367+
const entries: StoredNotification[] = mockAppendLog.mock.calls[0][2];
368+
// Only the direct agent_message — no coalesced partial entry
369+
expect(entries).toHaveLength(1);
370+
const persisted = entries[0].notification;
371+
expect(persisted.params?.update).toEqual({
372+
sessionUpdate: "agent_message",
373+
content: { type: "text", text: "complete" },
374+
});
375+
});
376+
});
377+
162378
describe("register", () => {
163379
it("does not re-register existing sessions", () => {
164380
const sessionId = "s1";

packages/agent/src/session-log-writer.ts

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,12 @@ export class SessionLogWriter {
5454
}
5555

5656
async flushAll(): Promise<void> {
57-
const sessionIds = [...this.sessions.keys()];
57+
// Coalesce any in-progress chunk buffers before the final flush
58+
// During normal operation, chunks are coalesced when the next non-chunk
59+
// event arrives, but on shutdown there may be no subsequent event
5860
const flushPromises: Promise<void>[] = [];
59-
for (const sessionId of sessionIds) {
61+
for (const [sessionId, session] of this.sessions) {
62+
this.emitCoalescedMessage(sessionId, session);
6063
flushPromises.push(this.flush(sessionId));
6164
}
6265
await Promise.all(flushPromises);
@@ -123,8 +126,14 @@ export class SessionLogWriter {
123126
return;
124127
}
125128

126-
// Non-chunk event: flush any buffered chunks first
127-
this.emitCoalescedMessage(sessionId, session);
129+
// Non-chunk event: flush any buffered chunks first.
130+
// If this is a direct agent_message AND there are buffered chunks,
131+
// the direct message supersedes the partial chunks
132+
if (this.isDirectAgentMessage(message) && session.chunkBuffer) {
133+
session.chunkBuffer = undefined;
134+
} else {
135+
this.emitCoalescedMessage(sessionId, session);
136+
}
128137

129138
const nonChunkAgentText = this.extractAgentMessageText(message);
130139
if (nonChunkAgentText) {
@@ -155,7 +164,17 @@ export class SessionLogWriter {
155164
}
156165
}
157166

158-
async flush(sessionId: string): Promise<void> {
167+
async flush(
168+
sessionId: string,
169+
{ coalesce = false }: { coalesce?: boolean } = {},
170+
): Promise<void> {
171+
if (coalesce) {
172+
const session = this.sessions.get(sessionId);
173+
if (session) {
174+
this.emitCoalescedMessage(sessionId, session);
175+
}
176+
}
177+
159178
// Serialize flushes per session
160179
const prev = this.flushQueues.get(sessionId) ?? Promise.resolve();
161180
const next = prev.catch(() => {}).then(() => this._doFlush(sessionId));
@@ -175,9 +194,6 @@ export class SessionLogWriter {
175194
return;
176195
}
177196

178-
// Emit any buffered chunks before flushing
179-
this.emitCoalescedMessage(sessionId, session);
180-
181197
const pending = this.pendingEntries.get(sessionId);
182198
if (!this.posthogAPI || !pending?.length) {
183199
return;
@@ -231,11 +247,21 @@ export class SessionLogWriter {
231247
}
232248
}
233249

234-
private isAgentMessageChunk(message: Record<string, unknown>): boolean {
235-
if (message.method !== "session/update") return false;
250+
private getSessionUpdateType(
251+
message: Record<string, unknown>,
252+
): string | undefined {
253+
if (message.method !== "session/update") return undefined;
236254
const params = message.params as Record<string, unknown> | undefined;
237255
const update = params?.update as Record<string, unknown> | undefined;
238-
return update?.sessionUpdate === "agent_message_chunk";
256+
return update?.sessionUpdate as string | undefined;
257+
}
258+
259+
private isDirectAgentMessage(message: Record<string, unknown>): boolean {
260+
return this.getSessionUpdateType(message) === "agent_message";
261+
}
262+
263+
private isAgentMessageChunk(message: Record<string, unknown>): boolean {
264+
return this.getSessionUpdateType(message) === "agent_message_chunk";
239265
}
240266

241267
private extractChunkText(message: Record<string, unknown>): string {
@@ -290,6 +316,17 @@ export class SessionLogWriter {
290316
getFullAgentResponse(sessionId: string): string | undefined {
291317
const session = this.sessions.get(sessionId);
292318
if (!session || session.currentTurnMessages.length === 0) return undefined;
319+
320+
if (session.chunkBuffer) {
321+
this.logger.warn(
322+
"getFullAgentResponse called with non-empty chunk buffer",
323+
{
324+
sessionId,
325+
bufferedLength: session.chunkBuffer.text.length,
326+
},
327+
);
328+
}
329+
293330
return session.currentTurnMessages.join("\n\n");
294331
}
295332

0 commit comments

Comments
 (0)