Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions apps/code/src/main/services/cloud-task/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,78 @@ describe("CloudTaskService", () => {
);
});

it("ignores keepalive SSE events while keeping the stream open", async () => {
const updates: unknown[] = [];
service.on(CloudTaskEvent.Update, (payload) => updates.push(payload));

mockNetFetch
.mockResolvedValueOnce(
createJsonResponse({
id: "run-1",
status: "in_progress",
stage: "build",
output: null,
error_message: null,
branch: "main",
updated_at: "2026-01-01T00:00:00Z",
}),
)
.mockResolvedValueOnce(
createJsonResponse([], 200, { "X-Has-More": "false" }),
);

mockStreamFetch.mockResolvedValueOnce(
createOpenSseResponse(
'event: keepalive\ndata: {"type":"keepalive"}\n\nid: 2\ndata: {"type":"notification","timestamp":"2026-01-01T00:00:02Z","notification":{"jsonrpc":"2.0","method":"_posthog/console","params":{"sessionId":"run-1","level":"info","message":"live tail"}}}\n\n',
),
);

service.watch({
taskId: "task-1",
runId: "run-1",
apiHost: "https://app.example.com",
teamId: 2,
});

await waitFor(() => updates.length >= 2);

expect(updates).toEqual([
{
taskId: "task-1",
runId: "run-1",
kind: "snapshot",
newEntries: [],
totalEntryCount: 0,
status: "in_progress",
stage: "build",
output: null,
errorMessage: null,
branch: "main",
},
{
taskId: "task-1",
runId: "run-1",
kind: "logs",
newEntries: [
{
type: "notification",
timestamp: "2026-01-01T00:00:02Z",
notification: {
jsonrpc: "2.0",
method: "_posthog/console",
params: {
sessionId: "run-1",
level: "info",
message: "live tail",
},
},
},
],
totalEntryCount: 1,
},
]);
});

it("emits a retryable cloud error after repeated stream failures", async () => {
vi.useFakeTimers();

Expand Down
10 changes: 10 additions & 0 deletions apps/code/src/main/services/cloud-task/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,16 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {

watcher.reconnectAttempts = 0;

if (
event.event === "keepalive" ||
(typeof event.data === "object" &&
event.data !== null &&
"type" in event.data &&
event.data.type === "keepalive")
) {
return;
}

if (isTaskRunStateEvent(event.data)) {
if (this.applyTaskRunState(watcher, event.data)) {
if (!watcher.isBootstrapping && !isTerminalStatus(watcher.lastStatus)) {
Expand Down
63 changes: 63 additions & 0 deletions apps/code/src/renderer/features/sessions/service/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,35 @@ describe("SessionService", () => {
});

describe("connectToTask", () => {
it("skips local connection for cloud runs", async () => {
const service = getSessionService();

await service.connectToTask({
task: createMockTask({
latest_run: {
id: "run-123",
task: "task-123",
team: 123,
environment: "cloud",
status: "in_progress",
log_url: "https://logs.example.com/run-123",
error_message: null,
output: null,
state: {},
branch: "main",
created_at: "2024-01-01T00:00:00Z",
updated_at: "2024-01-01T00:00:00Z",
completed_at: null,
},
}),
repoPath: "/repo",
});

expect(mockAuth.fetchAuthState).not.toHaveBeenCalled();
expect(mockTrpcAgent.reconnect.mutate).not.toHaveBeenCalled();
expect(mockSessionStoreSetters.setSession).not.toHaveBeenCalled();
});

it("skips connection if already connected", async () => {
const service = getSessionService();
const mockSession = createMockSession({ status: "connected" });
Expand Down Expand Up @@ -457,6 +486,40 @@ describe("SessionService", () => {
});

describe("watchCloudTask", () => {
it("resets a same-run preloaded session before the first cloud snapshot", () => {
const service = getSessionService();
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
createMockSession({
taskRunId: "run-123",
taskId: "task-123",
taskTitle: "Cloud Task",
events: [{ type: "acp_message", ts: 1, message: { method: "test" } }],
}),
);

service.watchCloudTask(
"task-123",
"run-123",
"https://app.example.com",
2,
);

expect(mockSessionStoreSetters.setSession).toHaveBeenCalledWith(
expect.objectContaining({
taskRunId: "run-123",
taskId: "task-123",
taskTitle: "Cloud Task",
isCloud: true,
status: "disconnected",
events: [],
}),
);
expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith(
"run-123",
expect.objectContaining({ isCloud: true }),
);
});

it("subscribes to cloud updates before starting the watcher", async () => {
const service = getSessionService();

Expand Down
22 changes: 21 additions & 1 deletion apps/code/src/renderer/features/sessions/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,14 @@ export class SessionService {
const { id: taskId, latest_run: latestRun } = task;
const taskTitle = task.title || task.description || "Task";

if (latestRun?.environment === "cloud") {
log.info("Skipping local session connect for cloud run", {
taskId,
taskRunId: latestRun.id,
});
return;
}

try {
const auth = await this.getAuthCredentials();
if (!auth) {
Expand Down Expand Up @@ -1931,7 +1939,19 @@ export class SessionService {

// Create session in the store
const existing = sessionStoreSetters.getSessionByTaskId(taskId);
if (!existing || existing.taskRunId !== taskRunId) {
// A same-run session with history but no processedLineCount came from a
// non-cloud hydration path. Reset it so the cloud snapshot becomes the
// single source of truth instead of being appended on top.
const shouldResetExistingSession =
existing?.taskRunId === taskRunId &&
existing.events.length > 0 &&
existing.processedLineCount === undefined;

if (
!existing ||
existing.taskRunId !== taskRunId ||
shouldResetExistingSession
) {
const taskTitle = existing?.taskTitle ?? "Cloud Task";
const session = this.createBaseSession(taskRunId, taskId, taskTitle);
session.status = "disconnected";
Expand Down
Loading