diff --git a/packages/agent/src/session-log-writer.test.ts b/packages/agent/src/session-log-writer.test.ts index f99aec380..b78e7f4e5 100644 --- a/packages/agent/src/session-log-writer.test.ts +++ b/packages/agent/src/session-log-writer.test.ts @@ -168,4 +168,91 @@ describe("SessionLogWriter", () => { expect(logWriter.isRegistered(sessionId)).toBe(true); }); }); + + describe("flush serialization", () => { + it("serializes concurrent flush calls so they do not overlap", async () => { + const sessionId = "s1"; + logWriter.register(sessionId, { taskId: "t1", runId: sessionId }); + + const callOrder: string[] = []; + let resolveFirst!: () => void; + const firstBlocked = new Promise((r) => { + resolveFirst = r; + }); + + mockAppendLog + .mockImplementationOnce(async () => { + callOrder.push("first-start"); + // Add a new entry while the first flush is in-flight + logWriter.appendRawLine(sessionId, JSON.stringify({ method: "b" })); + await firstBlocked; + callOrder.push("first-end"); + }) + .mockImplementationOnce(async () => { + callOrder.push("second-start"); + }); + + logWriter.appendRawLine(sessionId, JSON.stringify({ method: "a" })); + const flush1 = logWriter.flush(sessionId); + + // Wait for the first flush to be in-flight — "b" is added inside the mock + await vi.waitFor(() => expect(callOrder).toContain("first-start")); + + // Queue a second flush for the entry added while first was in-flight + const flush2 = logWriter.flush(sessionId); + + // First flush is blocked — second should not have started + expect(callOrder).not.toContain("second-start"); + + // Unblock first flush + resolveFirst?.(); + await flush1; + await flush2; + + // Second started only after first completed + expect(callOrder).toEqual(["first-start", "first-end", "second-start"]); + }); + + it("does not lose entries when flushes are serialized", async () => { + const sessionId = "s1"; + logWriter.register(sessionId, { taskId: "t1", runId: sessionId }); + + let resolveFirst!: () => void; + const firstBlocked = new Promise((r) => { + resolveFirst = r; + }); + + mockAppendLog + .mockImplementationOnce(async () => { + // Add a new entry while the first flush is in-flight — simulates + // the agent emitting end_turn while a scheduled flush is sending + // earlier entries to S3. + logWriter.appendRawLine(sessionId, JSON.stringify({ method: "b" })); + await firstBlocked; + }) + .mockImplementationOnce(async () => undefined); + + // Batch 1 + logWriter.appendRawLine(sessionId, JSON.stringify({ method: "a" })); + const flush1 = logWriter.flush(sessionId); + + // Wait for first flush to be in-flight (and "b" to be added) + await vi.waitFor(() => expect(mockAppendLog).toHaveBeenCalledTimes(1)); + + // Queue flush for the entry added while first was in-flight + const flush2 = logWriter.flush(sessionId); + + resolveFirst?.(); + await flush1; + await flush2; + + expect(mockAppendLog).toHaveBeenCalledTimes(2); + const batch1: StoredNotification[] = mockAppendLog.mock.calls[0][2]; + const batch2: StoredNotification[] = mockAppendLog.mock.calls[1][2]; + expect(batch1).toHaveLength(1); + expect(batch1[0].notification.method).toBe("a"); + expect(batch2).toHaveLength(1); + expect(batch2[0].notification.method).toBe("b"); + }); + }); }); diff --git a/packages/agent/src/session-log-writer.ts b/packages/agent/src/session-log-writer.ts index b615752ec..a49890096 100644 --- a/packages/agent/src/session-log-writer.ts +++ b/packages/agent/src/session-log-writer.ts @@ -40,6 +40,7 @@ export class SessionLogWriter { private lastFlushAttemptTime: Map = new Map(); private retryCounts: Map = new Map(); private sessions: Map = new Map(); + private flushQueues: Map> = new Map(); private logger: Logger; private localCachePath?: string; @@ -155,6 +156,19 @@ export class SessionLogWriter { } async flush(sessionId: string): Promise { + // Serialize flushes per session + const prev = this.flushQueues.get(sessionId) ?? Promise.resolve(); + const next = prev.catch(() => {}).then(() => this._doFlush(sessionId)); + this.flushQueues.set(sessionId, next); + next.finally(() => { + if (this.flushQueues.get(sessionId) === next) { + this.flushQueues.delete(sessionId); + } + }); + return next; + } + + private async _doFlush(sessionId: string): Promise { const session = this.sessions.get(sessionId); if (!session) { this.logger.warn("flush: no session found", { sessionId });