diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index 0f112dded..d1032cd15 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -322,6 +322,78 @@ describe("CloudTaskService", () => { ); }); + it("ignores keepalive SSE events while keeping the stream open", async () => { + const updates: unknown[] = []; + service.on(CloudTaskEvent.Update, (payload) => updates.push(payload)); + + mockNetFetch + .mockResolvedValueOnce( + createJsonResponse({ + id: "run-1", + status: "in_progress", + stage: "build", + output: null, + error_message: null, + branch: "main", + updated_at: "2026-01-01T00:00:00Z", + }), + ) + .mockResolvedValueOnce( + createJsonResponse([], 200, { "X-Has-More": "false" }), + ); + + mockStreamFetch.mockResolvedValueOnce( + createOpenSseResponse( + 'event: keepalive\ndata: {"type":"keepalive"}\n\nid: 2\ndata: {"type":"notification","timestamp":"2026-01-01T00:00:02Z","notification":{"jsonrpc":"2.0","method":"_posthog/console","params":{"sessionId":"run-1","level":"info","message":"live tail"}}}\n\n', + ), + ); + + service.watch({ + taskId: "task-1", + runId: "run-1", + apiHost: "https://app.example.com", + teamId: 2, + }); + + await waitFor(() => updates.length >= 2); + + expect(updates).toEqual([ + { + taskId: "task-1", + runId: "run-1", + kind: "snapshot", + newEntries: [], + totalEntryCount: 0, + status: "in_progress", + stage: "build", + output: null, + errorMessage: null, + branch: "main", + }, + { + taskId: "task-1", + runId: "run-1", + kind: "logs", + newEntries: [ + { + type: "notification", + timestamp: "2026-01-01T00:00:02Z", + notification: { + jsonrpc: "2.0", + method: "_posthog/console", + params: { + sessionId: "run-1", + level: "info", + message: "live tail", + }, + }, + }, + ], + totalEntryCount: 1, + }, + ]); + }); + it("emits a retryable cloud error after repeated stream failures", async () => { vi.useFakeTimers(); diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index d6a1d06c8..753f23c35 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -654,6 +654,16 @@ export class CloudTaskService extends TypedEventEmitter { watcher.reconnectAttempts = 0; + if ( + event.event === "keepalive" || + (typeof event.data === "object" && + event.data !== null && + "type" in event.data && + event.data.type === "keepalive") + ) { + return; + } + if (isTaskRunStateEvent(event.data)) { if (this.applyTaskRunState(watcher, event.data)) { if (!watcher.isBootstrapping && !isTerminalStatus(watcher.lastStatus)) { diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 924c3cd42..ae5f2ee99 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -298,6 +298,35 @@ describe("SessionService", () => { }); describe("connectToTask", () => { + it("skips local connection for cloud runs", async () => { + const service = getSessionService(); + + await service.connectToTask({ + task: createMockTask({ + latest_run: { + id: "run-123", + task: "task-123", + team: 123, + environment: "cloud", + status: "in_progress", + log_url: "https://logs.example.com/run-123", + error_message: null, + output: null, + state: {}, + branch: "main", + created_at: "2024-01-01T00:00:00Z", + updated_at: "2024-01-01T00:00:00Z", + completed_at: null, + }, + }), + repoPath: "/repo", + }); + + expect(mockAuth.fetchAuthState).not.toHaveBeenCalled(); + expect(mockTrpcAgent.reconnect.mutate).not.toHaveBeenCalled(); + expect(mockSessionStoreSetters.setSession).not.toHaveBeenCalled(); + }); + it("skips connection if already connected", async () => { const service = getSessionService(); const mockSession = createMockSession({ status: "connected" }); @@ -457,6 +486,40 @@ describe("SessionService", () => { }); describe("watchCloudTask", () => { + it("resets a same-run preloaded session before the first cloud snapshot", () => { + const service = getSessionService(); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + taskTitle: "Cloud Task", + events: [{ type: "acp_message", ts: 1, message: { method: "test" } }], + }), + ); + + service.watchCloudTask( + "task-123", + "run-123", + "https://app.example.com", + 2, + ); + + expect(mockSessionStoreSetters.setSession).toHaveBeenCalledWith( + expect.objectContaining({ + taskRunId: "run-123", + taskId: "task-123", + taskTitle: "Cloud Task", + isCloud: true, + status: "disconnected", + events: [], + }), + ); + expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ isCloud: true }), + ); + }); + it("subscribes to cloud updates before starting the watcher", async () => { const service = getSessionService(); diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index d6e457199..2e20eaa82 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -200,6 +200,14 @@ export class SessionService { const { id: taskId, latest_run: latestRun } = task; const taskTitle = task.title || task.description || "Task"; + if (latestRun?.environment === "cloud") { + log.info("Skipping local session connect for cloud run", { + taskId, + taskRunId: latestRun.id, + }); + return; + } + try { const auth = await this.getAuthCredentials(); if (!auth) { @@ -1931,7 +1939,19 @@ export class SessionService { // Create session in the store const existing = sessionStoreSetters.getSessionByTaskId(taskId); - if (!existing || existing.taskRunId !== taskRunId) { + // A same-run session with history but no processedLineCount came from a + // non-cloud hydration path. Reset it so the cloud snapshot becomes the + // single source of truth instead of being appended on top. + const shouldResetExistingSession = + existing?.taskRunId === taskRunId && + existing.events.length > 0 && + existing.processedLineCount === undefined; + + if ( + !existing || + existing.taskRunId !== taskRunId || + shouldResetExistingSession + ) { const taskTitle = existing?.taskTitle ?? "Cloud Task"; const session = this.createBaseSession(taskRunId, taskId, taskTitle); session.status = "disconnected";