Skip to content

Commit ada314f

Browse files
authored
feat(agent): session log serialization (#1391)
1 parent 663966a commit ada314f

2 files changed

Lines changed: 101 additions & 0 deletions

File tree

packages/agent/src/session-log-writer.test.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,4 +168,91 @@ describe("SessionLogWriter", () => {
168168
expect(logWriter.isRegistered(sessionId)).toBe(true);
169169
});
170170
});
171+
172+
describe("flush serialization", () => {
173+
it("serializes concurrent flush calls so they do not overlap", async () => {
174+
const sessionId = "s1";
175+
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });
176+
177+
const callOrder: string[] = [];
178+
let resolveFirst!: () => void;
179+
const firstBlocked = new Promise<void>((r) => {
180+
resolveFirst = r;
181+
});
182+
183+
mockAppendLog
184+
.mockImplementationOnce(async () => {
185+
callOrder.push("first-start");
186+
// Add a new entry while the first flush is in-flight
187+
logWriter.appendRawLine(sessionId, JSON.stringify({ method: "b" }));
188+
await firstBlocked;
189+
callOrder.push("first-end");
190+
})
191+
.mockImplementationOnce(async () => {
192+
callOrder.push("second-start");
193+
});
194+
195+
logWriter.appendRawLine(sessionId, JSON.stringify({ method: "a" }));
196+
const flush1 = logWriter.flush(sessionId);
197+
198+
// Wait for the first flush to be in-flight — "b" is added inside the mock
199+
await vi.waitFor(() => expect(callOrder).toContain("first-start"));
200+
201+
// Queue a second flush for the entry added while first was in-flight
202+
const flush2 = logWriter.flush(sessionId);
203+
204+
// First flush is blocked — second should not have started
205+
expect(callOrder).not.toContain("second-start");
206+
207+
// Unblock first flush
208+
resolveFirst?.();
209+
await flush1;
210+
await flush2;
211+
212+
// Second started only after first completed
213+
expect(callOrder).toEqual(["first-start", "first-end", "second-start"]);
214+
});
215+
216+
it("does not lose entries when flushes are serialized", async () => {
217+
const sessionId = "s1";
218+
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });
219+
220+
let resolveFirst!: () => void;
221+
const firstBlocked = new Promise<void>((r) => {
222+
resolveFirst = r;
223+
});
224+
225+
mockAppendLog
226+
.mockImplementationOnce(async () => {
227+
// Add a new entry while the first flush is in-flight — simulates
228+
// the agent emitting end_turn while a scheduled flush is sending
229+
// earlier entries to S3.
230+
logWriter.appendRawLine(sessionId, JSON.stringify({ method: "b" }));
231+
await firstBlocked;
232+
})
233+
.mockImplementationOnce(async () => undefined);
234+
235+
// Batch 1
236+
logWriter.appendRawLine(sessionId, JSON.stringify({ method: "a" }));
237+
const flush1 = logWriter.flush(sessionId);
238+
239+
// Wait for first flush to be in-flight (and "b" to be added)
240+
await vi.waitFor(() => expect(mockAppendLog).toHaveBeenCalledTimes(1));
241+
242+
// Queue flush for the entry added while first was in-flight
243+
const flush2 = logWriter.flush(sessionId);
244+
245+
resolveFirst?.();
246+
await flush1;
247+
await flush2;
248+
249+
expect(mockAppendLog).toHaveBeenCalledTimes(2);
250+
const batch1: StoredNotification[] = mockAppendLog.mock.calls[0][2];
251+
const batch2: StoredNotification[] = mockAppendLog.mock.calls[1][2];
252+
expect(batch1).toHaveLength(1);
253+
expect(batch1[0].notification.method).toBe("a");
254+
expect(batch2).toHaveLength(1);
255+
expect(batch2[0].notification.method).toBe("b");
256+
});
257+
});
171258
});

packages/agent/src/session-log-writer.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ export class SessionLogWriter {
4040
private lastFlushAttemptTime: Map<string, number> = new Map();
4141
private retryCounts: Map<string, number> = new Map();
4242
private sessions: Map<string, SessionState> = new Map();
43+
private flushQueues: Map<string, Promise<void>> = new Map();
4344

4445
private logger: Logger;
4546
private localCachePath?: string;
@@ -155,6 +156,19 @@ export class SessionLogWriter {
155156
}
156157

157158
async flush(sessionId: string): Promise<void> {
159+
// Serialize flushes per session
160+
const prev = this.flushQueues.get(sessionId) ?? Promise.resolve();
161+
const next = prev.catch(() => {}).then(() => this._doFlush(sessionId));
162+
this.flushQueues.set(sessionId, next);
163+
next.finally(() => {
164+
if (this.flushQueues.get(sessionId) === next) {
165+
this.flushQueues.delete(sessionId);
166+
}
167+
});
168+
return next;
169+
}
170+
171+
private async _doFlush(sessionId: string): Promise<void> {
158172
const session = this.sessions.get(sessionId);
159173
if (!session) {
160174
this.logger.warn("flush: no session found", { sessionId });

0 commit comments

Comments
 (0)