Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions packages/agent/src/session-log-writer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,91 @@ describe("SessionLogWriter", () => {
expect(logWriter.isRegistered(sessionId)).toBe(true);
});
});

describe("flush serialization", () => {
it("serializes concurrent flush calls so they do not overlap", async () => {
const sessionId = "s1";
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });

const callOrder: string[] = [];
let resolveFirst!: () => void;
const firstBlocked = new Promise<void>((r) => {
resolveFirst = r;
});

mockAppendLog
.mockImplementationOnce(async () => {
callOrder.push("first-start");
// Add a new entry while the first flush is in-flight
logWriter.appendRawLine(sessionId, JSON.stringify({ method: "b" }));
await firstBlocked;
callOrder.push("first-end");
})
.mockImplementationOnce(async () => {
callOrder.push("second-start");
});

logWriter.appendRawLine(sessionId, JSON.stringify({ method: "a" }));
const flush1 = logWriter.flush(sessionId);

// Wait for the first flush to be in-flight — "b" is added inside the mock
await vi.waitFor(() => expect(callOrder).toContain("first-start"));

// Queue a second flush for the entry added while first was in-flight
const flush2 = logWriter.flush(sessionId);

// First flush is blocked — second should not have started
expect(callOrder).not.toContain("second-start");

// Unblock first flush
resolveFirst?.();
await flush1;
await flush2;

// Second started only after first completed
expect(callOrder).toEqual(["first-start", "first-end", "second-start"]);
});

it("does not lose entries when flushes are serialized", async () => {
const sessionId = "s1";
logWriter.register(sessionId, { taskId: "t1", runId: sessionId });

let resolveFirst!: () => void;
const firstBlocked = new Promise<void>((r) => {
resolveFirst = r;
});

mockAppendLog
.mockImplementationOnce(async () => {
// Add a new entry while the first flush is in-flight — simulates
// the agent emitting end_turn while a scheduled flush is sending
// earlier entries to S3.
logWriter.appendRawLine(sessionId, JSON.stringify({ method: "b" }));
await firstBlocked;
})
.mockImplementationOnce(async () => undefined);

// Batch 1
logWriter.appendRawLine(sessionId, JSON.stringify({ method: "a" }));
const flush1 = logWriter.flush(sessionId);

// Wait for first flush to be in-flight (and "b" to be added)
await vi.waitFor(() => expect(mockAppendLog).toHaveBeenCalledTimes(1));

// Queue flush for the entry added while first was in-flight
const flush2 = logWriter.flush(sessionId);

resolveFirst?.();
await flush1;
await flush2;

expect(mockAppendLog).toHaveBeenCalledTimes(2);
const batch1: StoredNotification[] = mockAppendLog.mock.calls[0][2];
const batch2: StoredNotification[] = mockAppendLog.mock.calls[1][2];
expect(batch1).toHaveLength(1);
expect(batch1[0].notification.method).toBe("a");
expect(batch2).toHaveLength(1);
expect(batch2[0].notification.method).toBe("b");
});
});
});
14 changes: 14 additions & 0 deletions packages/agent/src/session-log-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export class SessionLogWriter {
private lastFlushAttemptTime: Map<string, number> = new Map();
private retryCounts: Map<string, number> = new Map();
private sessions: Map<string, SessionState> = new Map();
private flushQueues: Map<string, Promise<void>> = new Map();

private logger: Logger;
private localCachePath?: string;
Expand Down Expand Up @@ -155,6 +156,19 @@ export class SessionLogWriter {
}

async flush(sessionId: string): Promise<void> {
// Serialize flushes per session
const prev = this.flushQueues.get(sessionId) ?? Promise.resolve();
const next = prev.catch(() => {}).then(() => this._doFlush(sessionId));
this.flushQueues.set(sessionId, next);
next.finally(() => {
if (this.flushQueues.get(sessionId) === next) {
this.flushQueues.delete(sessionId);
}
});
return next;
}

private async _doFlush(sessionId: string): Promise<void> {
const session = this.sessions.get(sessionId);
if (!session) {
this.logger.warn("flush: no session found", { sessionId });
Expand Down
Loading