Skip to content

Commit 6a13d32

Browse files
authored
fix(code): hydrate cloud runs after renderer reload (#1631)
1 parent a167fe7 commit 6a13d32

3 files changed

Lines changed: 105 additions & 0 deletions

File tree

apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ export function useSessionConnection({
8080
() => {
8181
queryClient.invalidateQueries({ queryKey: ["tasks"] });
8282
},
83+
task.latest_run?.log_url,
8384
);
8485
return cleanup;
8586
}, [
@@ -91,6 +92,7 @@ export function useSessionConnection({
9192
queryClient,
9293
task.id,
9394
task.latest_run?.id,
95+
task.latest_run?.log_url,
9496
]);
9597

9698
useEffect(() => {

apps/code/src/renderer/features/sessions/service/service.test.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,59 @@ describe("SessionService", () => {
552552
);
553553
});
554554

555+
it("hydrates a fresh cloud session from persisted logs before replay arrives", async () => {
556+
const service = getSessionService();
557+
const hydratedSession = createMockSession({
558+
taskRunId: "run-123",
559+
taskId: "task-123",
560+
taskTitle: "Cloud Task",
561+
status: "disconnected",
562+
isCloud: true,
563+
events: [],
564+
});
565+
566+
mockSessionStoreSetters.getSessionByTaskId.mockImplementation(() => {
567+
return hydratedSession;
568+
});
569+
mockTrpcLogs.readLocalLogs.query.mockResolvedValue("");
570+
mockTrpcLogs.fetchS3Logs.query.mockResolvedValue(
571+
JSON.stringify({
572+
type: "notification",
573+
timestamp: "2024-01-01T00:00:00Z",
574+
notification: {
575+
method: "session/update",
576+
params: {
577+
update: {
578+
sessionUpdate: "assistant_message",
579+
},
580+
},
581+
},
582+
}),
583+
);
584+
mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined);
585+
586+
service.watchCloudTask(
587+
"task-123",
588+
"run-123",
589+
"https://api.anthropic.com",
590+
123,
591+
undefined,
592+
"https://logs.example.com/run-123",
593+
);
594+
595+
await vi.waitFor(() => {
596+
expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith(
597+
"run-123",
598+
expect.objectContaining({
599+
events: [],
600+
isCloud: true,
601+
logUrl: "https://logs.example.com/run-123",
602+
processedLineCount: 1,
603+
}),
604+
);
605+
});
606+
});
607+
555608
it("ignores stale async starts when the same watcher is replaced", async () => {
556609
const service = getSessionService();
557610
let resolveFirstWatchStart!: () => void;

apps/code/src/renderer/features/sessions/service/service.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1917,6 +1917,7 @@ export class SessionService {
19171917
apiHost: string,
19181918
teamId: number,
19191919
onStatusChange?: () => void,
1920+
logUrl?: string,
19201921
): () => void {
19211922
const taskRunId = runId;
19221923
const startToken = ++this.nextCloudTaskWatchToken;
@@ -1947,6 +1948,11 @@ export class SessionService {
19471948
existing?.taskRunId === taskRunId &&
19481949
existing.events.length > 0 &&
19491950
existing.processedLineCount === undefined;
1951+
const shouldHydrateSession =
1952+
!existing ||
1953+
existing.taskRunId !== taskRunId ||
1954+
shouldResetExistingSession ||
1955+
existing.events.length === 0;
19501956

19511957
if (
19521958
!existing ||
@@ -1964,6 +1970,10 @@ export class SessionService {
19641970
});
19651971
}
19661972

1973+
if (shouldHydrateSession) {
1974+
this.hydrateCloudTaskSessionFromLogs(taskId, taskRunId, logUrl);
1975+
}
1976+
19671977
// Subscribe before starting the main-process watcher so the first replayed
19681978
// SSE/log burst cannot race ahead of the renderer subscription.
19691979
const subscription = trpcClient.cloudTask.onUpdate.subscribe(
@@ -2025,6 +2035,46 @@ export class SessionService {
20252035
return () => {};
20262036
}
20272037

2038+
private hydrateCloudTaskSessionFromLogs(
2039+
taskId: string,
2040+
taskRunId: string,
2041+
logUrl?: string,
2042+
): void {
2043+
void (async () => {
2044+
const { rawEntries } = await this.fetchSessionLogs(logUrl, taskRunId);
2045+
if (rawEntries.length === 0) {
2046+
return;
2047+
}
2048+
2049+
const session = sessionStoreSetters.getSessionByTaskId(taskId);
2050+
if (!session || session.taskRunId !== taskRunId) {
2051+
return;
2052+
}
2053+
2054+
// If live updates already populated a processed count, don't overwrite
2055+
// that newer state with the persisted baseline fetched during startup.
2056+
if (
2057+
session.processedLineCount !== undefined &&
2058+
session.processedLineCount > 0
2059+
) {
2060+
return;
2061+
}
2062+
2063+
sessionStoreSetters.updateSession(taskRunId, {
2064+
events: convertStoredEntriesToEvents(rawEntries),
2065+
isCloud: true,
2066+
logUrl: logUrl ?? session.logUrl,
2067+
processedLineCount: rawEntries.length,
2068+
});
2069+
})().catch((err: unknown) => {
2070+
log.warn("Failed to hydrate cloud task session from logs", {
2071+
taskId,
2072+
taskRunId,
2073+
err,
2074+
});
2075+
});
2076+
}
2077+
20282078
private isCurrentCloudTaskWatcher(
20292079
taskId: string,
20302080
runId: string,

0 commit comments

Comments
 (0)