Skip to content

Commit 07bdacc

Browse files
authored
feat: Implement cloud follow-up resume for agent sessions (#1189)
1 parent f5a8813 commit 07bdacc

File tree

10 files changed

+516
-78
lines changed

10 files changed

+516
-78
lines changed

apps/code/src/renderer/api/posthogClient.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,12 +402,20 @@ export class PostHogAPIClient {
402402
}
403403
}
404404

405-
async runTaskInCloud(taskId: string, branch?: string | null): Promise<Task> {
405+
async runTaskInCloud(
406+
taskId: string,
407+
branch?: string | null,
408+
resumeOptions?: { resumeFromRunId: string; pendingUserMessage: string },
409+
): Promise<Task> {
406410
const teamId = await this.getTeamId();
407-
const body: Record<string, unknown> = {};
411+
const body: Record<string, unknown> = { mode: "interactive" };
408412
if (branch) {
409413
body.branch = branch;
410414
}
415+
if (resumeOptions) {
416+
body.resume_from_run_id = resumeOptions.resumeFromRunId;
417+
body.pending_user_message = resumeOptions.pendingUserMessage;
418+
}
411419

412420
const data = await this.api.post(
413421
`/api/projects/{project_id}/tasks/{id}/run/`,

apps/code/src/renderer/features/sessions/components/SessionView.tsx

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ interface SessionViewProps {
5555
onRetry?: () => void;
5656
onNewSession?: () => void;
5757
isInitializing?: boolean;
58-
readOnlyMessage?: string;
5958
slackThreadUrl?: string;
6059
}
6160

@@ -80,7 +79,6 @@ export function SessionView({
8079
onRetry,
8180
onNewSession,
8281
isInitializing = false,
83-
readOnlyMessage,
8482
slackThreadUrl,
8583
}: SessionViewProps) {
8684
const showRawLogs = useShowRawLogs();
@@ -439,22 +437,6 @@ export function SessionView({
439437
/>
440438
</Box>
441439
</Box>
442-
) : readOnlyMessage ? (
443-
<Box className="border-gray-4 border-t">
444-
<Box className="mx-auto max-w-[750px] p-2">
445-
<Flex align="center" justify="center" py="3">
446-
<Text
447-
size="2"
448-
style={{
449-
color: "var(--gray-9)",
450-
fontFamily: "var(--font-mono)",
451-
}}
452-
>
453-
{readOnlyMessage}
454-
</Text>
455-
</Flex>
456-
</Box>
457-
</Box>
458440
) : (
459441
<Box className="relative border-gray-4 border-t">
460442
<Box

apps/code/src/renderer/features/sessions/service/service.ts

Lines changed: 216 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,10 @@ import {
4444
notifyPermissionRequest,
4545
notifyPromptComplete,
4646
} from "@utils/notifications";
47+
import { queryClient } from "@utils/queryClient";
4748
import {
4849
convertStoredEntriesToEvents,
50+
createUserMessageEvent,
4951
createUserShellExecuteEvent,
5052
extractPromptText,
5153
getUserShellExecutesSinceLastPrompt,
@@ -1134,6 +1136,7 @@ export class SessionService {
11341136
private async sendCloudPrompt(
11351137
session: AgentSession,
11361138
prompt: string | ContentBlock[],
1139+
options?: { skipQueueGuard?: boolean },
11371140
): Promise<{ stopReason: string }> {
11381141
const promptText = extractPromptText(prompt);
11391142
if (!promptText.trim()) {
@@ -1142,10 +1145,10 @@ export class SessionService {
11421145

11431146
const terminalStatuses = new Set(["completed", "failed", "cancelled"]);
11441147
if (session.cloudStatus && terminalStatuses.has(session.cloudStatus)) {
1145-
throw new Error("This cloud run has already finished");
1148+
return this.resumeCloudRun(session, promptText);
11461149
}
11471150

1148-
if (session.isPromptPending) {
1151+
if (!options?.skipQueueGuard && session.isPromptPending) {
11491152
sessionStoreSetters.enqueueMessage(session.taskId, promptText);
11501153
log.info("Cloud message queued", {
11511154
taskId: session.taskId,
@@ -1216,24 +1219,140 @@ export class SessionService {
12161219

12171220
private async sendQueuedCloudMessages(
12181221
taskId: string,
1222+
attempt = 0,
1223+
pendingText?: string,
12191224
): Promise<{ stopReason: string }> {
1220-
const combinedText = sessionStoreSetters.dequeueMessagesAsText(taskId);
1221-
if (!combinedText) {
1222-
return { stopReason: "skipped" };
1223-
}
1225+
// First attempt: atomically dequeue. Retries reuse the already-dequeued text.
1226+
const combinedText =
1227+
pendingText ?? sessionStoreSetters.dequeueMessagesAsText(taskId);
1228+
if (!combinedText) return { stopReason: "skipped" };
12241229

12251230
const session = sessionStoreSetters.getSessionByTaskId(taskId);
12261231
if (!session) {
1227-
log.warn("No session found for queued cloud messages", { taskId });
1232+
log.warn("No session found for queued cloud messages, message lost", {
1233+
taskId,
1234+
});
12281235
return { stopReason: "no_session" };
12291236
}
12301237

12311238
log.info("Sending queued cloud messages", {
12321239
taskId,
12331240
promptLength: combinedText.length,
1241+
attempt,
12341242
});
12351243

1236-
return this.sendCloudPrompt(session, combinedText);
1244+
try {
1245+
return await this.sendCloudPrompt(session, combinedText, {
1246+
skipQueueGuard: true,
1247+
});
1248+
} catch (error) {
1249+
const maxRetries = 5;
1250+
if (attempt < maxRetries) {
1251+
const delayMs = Math.min(1000 * 2 ** attempt, 10_000);
1252+
log.warn("Cloud message send failed, scheduling retry", {
1253+
taskId,
1254+
attempt,
1255+
delayMs,
1256+
error: String(error),
1257+
});
1258+
return new Promise((resolve) => {
1259+
setTimeout(() => {
1260+
resolve(
1261+
this.sendQueuedCloudMessages(
1262+
taskId,
1263+
attempt + 1,
1264+
combinedText,
1265+
).catch((err) => {
1266+
log.error("Queued cloud message retry failed", {
1267+
taskId,
1268+
attempt: attempt + 1,
1269+
error: err,
1270+
});
1271+
return { stopReason: "error" };
1272+
}),
1273+
);
1274+
}, delayMs);
1275+
});
1276+
}
1277+
1278+
log.error("Queued cloud message send failed after max retries", {
1279+
taskId,
1280+
attempts: attempt + 1,
1281+
});
1282+
toast.error("Failed to send follow-up message. Please try again.");
1283+
return { stopReason: "error" };
1284+
}
1285+
}
1286+
1287+
private async resumeCloudRun(
1288+
session: AgentSession,
1289+
promptText: string,
1290+
): Promise<{ stopReason: string }> {
1291+
const client = useAuthStore.getState().client;
1292+
if (!client) {
1293+
throw new Error("Authentication required for cloud commands");
1294+
}
1295+
1296+
log.info("Creating resume run for terminal cloud task", {
1297+
taskId: session.taskId,
1298+
previousRunId: session.taskRunId,
1299+
previousStatus: session.cloudStatus,
1300+
});
1301+
1302+
// Create a new run WITH resume context — backend validates the previous run,
1303+
// derives snapshot_external_id server-side, and passes everything as extra_state.
1304+
// The agent will load conversation history and restore the sandbox snapshot.
1305+
const updatedTask = await client.runTaskInCloud(
1306+
session.taskId,
1307+
session.cloudBranch,
1308+
{
1309+
resumeFromRunId: session.taskRunId,
1310+
pendingUserMessage: promptText,
1311+
},
1312+
);
1313+
const newRun = updatedTask.latest_run;
1314+
if (!newRun?.id) {
1315+
throw new Error("Failed to create resume run");
1316+
}
1317+
1318+
// Replace session with one for the new run, preserving conversation history.
1319+
// setSession handles old session cleanup via taskIdIndex.
1320+
const newSession = this.createBaseSession(
1321+
newRun.id,
1322+
session.taskId,
1323+
session.taskTitle,
1324+
);
1325+
newSession.status = "disconnected";
1326+
newSession.isCloud = true;
1327+
// Carry over existing events and add optimistic user bubble for the follow-up.
1328+
// Reset processedLineCount to 0 because the new run's log stream starts fresh.
1329+
newSession.events = [
1330+
...session.events,
1331+
createUserMessageEvent(promptText, Date.now()),
1332+
];
1333+
newSession.processedLineCount = 0;
1334+
// Skip the first session/prompt from polled logs — we already have the
1335+
// optimistic user event, so showing the polled one would duplicate it.
1336+
newSession.skipPolledPromptCount = 1;
1337+
sessionStoreSetters.setSession(newSession);
1338+
1339+
// No enqueueMessage / isPromptPending needed — the follow-up is passed
1340+
// in run state (pending_user_message), NOT via user_message command.
1341+
1342+
// Start the watcher immediately so we don't miss status updates.
1343+
this.watchCloudTask(session.taskId, newRun.id);
1344+
1345+
// Invalidate task queries so the UI picks up the new run metadata
1346+
queryClient.invalidateQueries({ queryKey: ["tasks"] });
1347+
1348+
track(ANALYTICS_EVENTS.PROMPT_SENT, {
1349+
task_id: session.taskId,
1350+
is_initial: false,
1351+
execution_type: "cloud",
1352+
prompt_length_chars: promptText.length,
1353+
});
1354+
1355+
return { stopReason: "queued" };
12371356
}
12381357

12391358
private async cancelCloudPrompt(session: AgentSession): Promise<boolean> {
@@ -1772,7 +1891,12 @@ export class SessionService {
17721891
} else if (delta <= update.newEntries.length) {
17731892
// Normal case: append only the tail (last `delta` entries)
17741893
const entriesToAppend = update.newEntries.slice(-delta);
1775-
const newEvents = convertStoredEntriesToEvents(entriesToAppend);
1894+
let newEvents = convertStoredEntriesToEvents(entriesToAppend);
1895+
newEvents = this.filterSkippedPromptEvents(
1896+
taskRunId,
1897+
session,
1898+
newEvents,
1899+
);
17761900
sessionStoreSetters.appendEvents(taskRunId, newEvents, expectedCount);
17771901
this.updatePromptStateFromEvents(taskRunId, newEvents);
17781902
} else {
@@ -1783,7 +1907,12 @@ export class SessionService {
17831907
expectedCount,
17841908
entriesReceived: update.newEntries.length,
17851909
});
1786-
const newEvents = convertStoredEntriesToEvents(update.newEntries);
1910+
let newEvents = convertStoredEntriesToEvents(update.newEntries);
1911+
newEvents = this.filterSkippedPromptEvents(
1912+
taskRunId,
1913+
session,
1914+
newEvents,
1915+
);
17871916
sessionStoreSetters.appendEvents(
17881917
taskRunId,
17891918
newEvents,
@@ -1792,6 +1921,22 @@ export class SessionService {
17921921
this.updatePromptStateFromEvents(taskRunId, newEvents);
17931922
}
17941923
}
1924+
1925+
// Flush queued messages when a cloud turn completes (detected via log polling)
1926+
const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId];
1927+
if (
1928+
sessionAfterLogs &&
1929+
!sessionAfterLogs.isPromptPending &&
1930+
sessionAfterLogs.messageQueue.length > 0
1931+
) {
1932+
this.sendQueuedCloudMessages(sessionAfterLogs.taskId).catch((err) => {
1933+
log.error("Failed to send queued cloud messages after turn complete", {
1934+
taskId: sessionAfterLogs.taskId,
1935+
error: err,
1936+
});
1937+
});
1938+
}
1939+
17951940
// Update cloud status fields if present
17961941
if (update.kind === "status" || update.kind === "snapshot") {
17971942
sessionStoreSetters.updateCloudStatus(taskRunId, {
@@ -1802,13 +1947,74 @@ export class SessionService {
18021947
branch: update.branch,
18031948
});
18041949

1950+
// Auto-send queued messages when a resumed run becomes active
1951+
if (update.status === "in_progress") {
1952+
const session = sessionStoreSetters.getSessions()[taskRunId];
1953+
if (session && session.messageQueue.length > 0) {
1954+
// Clear the pending flag first — resumeCloudRun sets it as a guard
1955+
// while waiting for the run to start. Now that the run is active,
1956+
// sendCloudPrompt needs the flag clear to actually send.
1957+
sessionStoreSetters.updateSession(taskRunId, {
1958+
isPromptPending: false,
1959+
});
1960+
this.sendQueuedCloudMessages(session.taskId).catch(() => {
1961+
// Retries exhausted — message was re-enqueued by
1962+
// sendQueuedCloudMessages, poll-based flush will keep trying
1963+
});
1964+
}
1965+
}
1966+
18051967
const terminalStatuses = new Set(["completed", "failed", "cancelled"]);
18061968
if (update.status && terminalStatuses.has(update.status)) {
1969+
// Clean up any pending resume messages that couldn't be sent
1970+
const session = sessionStoreSetters.getSessions()[taskRunId];
1971+
if (
1972+
session &&
1973+
(session.messageQueue.length > 0 || session.isPromptPending)
1974+
) {
1975+
sessionStoreSetters.clearMessageQueue(session.taskId);
1976+
sessionStoreSetters.updateSession(taskRunId, {
1977+
isPromptPending: false,
1978+
});
1979+
}
18071980
this.stopCloudTaskWatch(update.taskId);
18081981
}
18091982
}
18101983
}
18111984

1985+
/**
1986+
* Filter out session/prompt events that should be skipped during resume.
1987+
* When resuming a cloud run, the initial session/prompt from the new run's
1988+
* logs would duplicate the optimistic user bubble we already added.
1989+
*/
1990+
// Note: `session` is a snapshot from the start of handleCloudTaskUpdate.
1991+
// The updateSession call below makes it stale, but this is safe because
1992+
// skipPolledPromptCount is only ever 1, so this method runs at most once.
1993+
private filterSkippedPromptEvents(
1994+
taskRunId: string,
1995+
session: AgentSession | undefined,
1996+
events: AcpMessage[],
1997+
): AcpMessage[] {
1998+
if (!session?.skipPolledPromptCount || session.skipPolledPromptCount <= 0) {
1999+
return events;
2000+
}
2001+
2002+
const promptIdx = events.findIndex(
2003+
(e) =>
2004+
isJsonRpcRequest(e.message) && e.message.method === "session/prompt",
2005+
);
2006+
if (promptIdx !== -1) {
2007+
const filtered = [...events];
2008+
filtered.splice(promptIdx, 1);
2009+
sessionStoreSetters.updateSession(taskRunId, {
2010+
skipPolledPromptCount: (session.skipPolledPromptCount ?? 0) - 1,
2011+
});
2012+
return filtered;
2013+
}
2014+
2015+
return events;
2016+
}
2017+
18122018
// --- Helper Methods ---
18132019

18142020
private getAuthCredentials(): AuthCredentials | null {

apps/code/src/renderer/features/sessions/stores/sessionStore.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ export interface AgentSession {
6161
cloudErrorMessage?: string | null;
6262
/** Cloud task branch */
6363
cloudBranch?: string | null;
64+
/** Number of session/prompt events to skip from polled logs (set during resume) */
65+
skipPolledPromptCount?: number;
6466
}
6567

6668
// --- Config Option Helpers ---

0 commit comments

Comments
 (0)