diff --git a/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts b/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts index 5960f3939..b41c2e5be 100644 --- a/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts +++ b/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts @@ -80,6 +80,7 @@ export function useSessionConnection({ () => { queryClient.invalidateQueries({ queryKey: ["tasks"] }); }, + task.latest_run?.log_url, ); return cleanup; }, [ @@ -91,6 +92,7 @@ export function useSessionConnection({ queryClient, task.id, task.latest_run?.id, + task.latest_run?.log_url, ]); useEffect(() => { diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index ae5f2ee99..7305679de 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -552,6 +552,59 @@ describe("SessionService", () => { ); }); + it("hydrates a fresh cloud session from persisted logs before replay arrives", async () => { + const service = getSessionService(); + const hydratedSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + taskTitle: "Cloud Task", + status: "disconnected", + isCloud: true, + events: [], + }); + + mockSessionStoreSetters.getSessionByTaskId.mockImplementation(() => { + return hydratedSession; + }); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(""); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue( + JSON.stringify({ + type: "notification", + timestamp: "2024-01-01T00:00:00Z", + notification: { + method: "session/update", + params: { + update: { + sessionUpdate: "assistant_message", + }, + }, + }, + }), + ); + mockTrpcLogs.writeLocalLogs.mutate.mockResolvedValue(undefined); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + undefined, + "https://logs.example.com/run-123", + ); + + await vi.waitFor(() => { + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ + events: [], + isCloud: true, + logUrl: "https://logs.example.com/run-123", + processedLineCount: 1, + }), + ); + }); + }); + it("ignores stale async starts when the same watcher is replaced", async () => { const service = getSessionService(); let resolveFirstWatchStart!: () => void; diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index c0bb1a8b9..12cfeb904 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -1917,6 +1917,7 @@ export class SessionService { apiHost: string, teamId: number, onStatusChange?: () => void, + logUrl?: string, ): () => void { const taskRunId = runId; const startToken = ++this.nextCloudTaskWatchToken; @@ -1947,6 +1948,11 @@ export class SessionService { existing?.taskRunId === taskRunId && existing.events.length > 0 && existing.processedLineCount === undefined; + const shouldHydrateSession = + !existing || + existing.taskRunId !== taskRunId || + shouldResetExistingSession || + existing.events.length === 0; if ( !existing || @@ -1964,6 +1970,10 @@ export class SessionService { }); } + if (shouldHydrateSession) { + this.hydrateCloudTaskSessionFromLogs(taskId, taskRunId, logUrl); + } + // Subscribe before starting the main-process watcher so the first replayed // SSE/log burst cannot race ahead of the renderer subscription. const subscription = trpcClient.cloudTask.onUpdate.subscribe( @@ -2025,6 +2035,46 @@ export class SessionService { return () => {}; } + private hydrateCloudTaskSessionFromLogs( + taskId: string, + taskRunId: string, + logUrl?: string, + ): void { + void (async () => { + const { rawEntries } = await this.fetchSessionLogs(logUrl, taskRunId); + if (rawEntries.length === 0) { + return; + } + + const session = sessionStoreSetters.getSessionByTaskId(taskId); + if (!session || session.taskRunId !== taskRunId) { + return; + } + + // If live updates already populated a processed count, don't overwrite + // that newer state with the persisted baseline fetched during startup. + if ( + session.processedLineCount !== undefined && + session.processedLineCount > 0 + ) { + return; + } + + sessionStoreSetters.updateSession(taskRunId, { + events: convertStoredEntriesToEvents(rawEntries), + isCloud: true, + logUrl: logUrl ?? session.logUrl, + processedLineCount: rawEntries.length, + }); + })().catch((err: unknown) => { + log.warn("Failed to hydrate cloud task session from logs", { + taskId, + taskRunId, + err, + }); + }); + } + private isCurrentCloudTaskWatcher( taskId: string, runId: string,