diff --git a/apps/code/src/main/services/git/service.ts b/apps/code/src/main/services/git/service.ts index 4eef16587..595f24550 100644 --- a/apps/code/src/main/services/git/service.ts +++ b/apps/code/src/main/services/git/service.ts @@ -104,13 +104,12 @@ function toUnifiedDiffPatch( @injectable() export class GitService extends TypedEventEmitter { private lastFetchTime = new Map(); - private llmGateway: LlmGatewayService; constructor( - @inject(MAIN_TOKENS.LlmGatewayService) llmGateway: LlmGatewayService, + @inject(MAIN_TOKENS.LlmGatewayService) + private readonly llmGateway: LlmGatewayService, ) { super(); - this.llmGateway = llmGateway; } private async getStateSnapshot( @@ -1086,85 +1085,71 @@ export class GitService extends TypedEventEmitter { const [owner, repoName] = parts; - try { - const repoResult = await execGh([ - "api", - `repos/${owner}/${repoName}`, - "--jq", - ".default_branch", - ]); + const repoResult = await execGh([ + "api", + `repos/${owner}/${repoName}`, + "--jq", + ".default_branch", + ]); - if (repoResult.exitCode !== 0 || !repoResult.stdout.trim()) { - return []; - } - const defaultBranch = repoResult.stdout.trim(); + if (repoResult.exitCode !== 0 || !repoResult.stdout.trim()) { + return []; + } + const defaultBranch = repoResult.stdout.trim(); - const result = await execGh([ - "api", - `repos/${owner}/${repoName}/compare/${defaultBranch}...${branch}`, - ]); + const result = await execGh([ + "api", + `repos/${owner}/${repoName}/compare/${defaultBranch}...${branch}`, + ]); - if (result.exitCode !== 0) { - throw new Error( - `Failed to fetch branch files: ${result.stderr || result.error || "Unknown error"}`, - ); + if (result.exitCode !== 0) { + throw new Error( + `Failed to fetch branch files: ${result.stderr || result.error || "Unknown error"}`, + ); + } + + const response = JSON.parse(result.stdout) as { + files?: Array<{ + filename: string; + status: string; + previous_filename?: string; + additions: number; + deletions: number; + patch?: string; + }>; + }; + const files = response.files; + + if (!files) return []; + + return files.map((f) => { + let status: ChangedFile["status"]; + switch (f.status) { + case "added": + status = "added"; + break; + case "removed": + status = "deleted"; + break; + case "renamed": + status = "renamed"; + break; + default: + status = "modified"; + break; } - const response = JSON.parse(result.stdout) as { - files?: Array<{ - filename: string; - status: string; - previous_filename?: string; - additions: number; - deletions: number; - patch?: string; - }>; + return { + path: f.filename, + status, + originalPath: f.previous_filename, + linesAdded: f.additions, + linesRemoved: f.deletions, + patch: f.patch + ? toUnifiedDiffPatch(f.patch, f.filename, f.previous_filename, status) + : undefined, }; - const files = response.files; - - if (!files) return []; - - return files.map((f) => { - let status: ChangedFile["status"]; - switch (f.status) { - case "added": - status = "added"; - break; - case "removed": - status = "deleted"; - break; - case "renamed": - status = "renamed"; - break; - default: - status = "modified"; - break; - } - - return { - path: f.filename, - status, - originalPath: f.previous_filename, - linesAdded: f.additions, - linesRemoved: f.deletions, - patch: f.patch - ? toUnifiedDiffPatch( - f.patch, - f.filename, - f.previous_filename, - status, - ) - : undefined, - }; - }); - } catch (error) { - log.warn("Failed to fetch branch changed files", { - repo, - branch, - error, - }); - throw error; - } + }); } public async generateCommitMessage( diff --git a/apps/code/src/main/services/handoff/handoff-saga.test.ts b/apps/code/src/main/services/handoff/handoff-saga.test.ts index 0137b2b4c..c7221e874 100644 --- a/apps/code/src/main/services/handoff/handoff-saga.test.ts +++ b/apps/code/src/main/services/handoff/handoff-saga.test.ts @@ -73,6 +73,7 @@ function createDeps(overrides: Partial = {}): HandoffSagaDeps { getTaskRun: vi.fn().mockResolvedValue({ log_url: "https://logs.example.com/run-1.ndjson", }), + updateTaskRun: vi.fn().mockResolvedValue({}), }), applyTreeSnapshot: vi.fn().mockResolvedValue(undefined), applyGitCheckpoint: vi.fn().mockResolvedValue(undefined), @@ -97,7 +98,6 @@ function createResumeState( conversation: [], latestSnapshot: null, latestGitCheckpoint: null, - snapshotApplied: false, interrupted: false, logEntryCount: 0, ...overrides, diff --git a/apps/code/src/main/services/handoff/handoff-saga.ts b/apps/code/src/main/services/handoff/handoff-saga.ts index f39e74f76..0c783554f 100644 --- a/apps/code/src/main/services/handoff/handoff-saga.ts +++ b/apps/code/src/main/services/handoff/handoff-saga.ts @@ -6,20 +6,10 @@ import { } from "@posthog/agent/resume"; import type * as AgentTypes from "@posthog/agent/types"; import { Saga, type SagaLogger } from "@posthog/shared"; -import type { WorkspaceMode } from "../../db/repositories/workspace-repository"; import type { SessionResponse } from "../agent/schemas"; -import type { HandoffStep } from "./schemas"; - -export interface HandoffSagaInput { - taskId: string; - runId: string; - repoPath: string; - apiHost: string; - teamId: number; - sessionId?: string; - adapter?: "claude" | "codex"; - localGitState?: AgentTypes.HandoffLocalGitState; -} +import type { HandoffBaseDeps, HandoffExecuteInput } from "./schemas"; + +export type HandoffSagaInput = HandoffExecuteInput; export interface HandoffSagaOutput { sessionId: string; @@ -27,8 +17,7 @@ export interface HandoffSagaOutput { conversationTurns: number; } -export interface HandoffSagaDeps { - createApiClient(apiHost: string, teamId: number): PostHogAPIClient; +export interface HandoffSagaDeps extends HandoffBaseDeps { applyTreeSnapshot( snapshot: AgentTypes.TreeSnapshotEvent, repoPath: string, @@ -44,7 +33,6 @@ export interface HandoffSagaDeps { apiClient: PostHogAPIClient, localGitState?: AgentTypes.HandoffLocalGitState, ): Promise; - updateWorkspaceMode(taskId: string, mode: WorkspaceMode): void; reconnectSession(params: { taskId: string; taskRunId: string; @@ -63,9 +51,7 @@ export interface HandoffSagaDeps { localGitState?: AgentTypes.HandoffLocalGitState, ): Promise; seedLocalLogs(runId: string, logUrl: string): Promise; - killSession(taskRunId: string): Promise; setPendingContext(taskRunId: string, context: string): void; - onProgress(step: HandoffStep, message: string): void; } export class HandoffSaga extends Saga { @@ -97,6 +83,12 @@ export class HandoffSaga extends Saga { const apiClient = this.deps.createApiClient(apiHost, teamId); + await this.readOnlyStep("update_run_environment", async () => { + await apiClient.updateTaskRun(taskId, runId, { + environment: "local", + }); + }); + const { resumeState, cloudLogUrl } = await this.readOnlyStep( "fetch_and_rebuild", async () => { diff --git a/apps/code/src/main/services/handoff/handoff-to-cloud-saga.ts b/apps/code/src/main/services/handoff/handoff-to-cloud-saga.ts new file mode 100644 index 000000000..6effd7bd2 --- /dev/null +++ b/apps/code/src/main/services/handoff/handoff-to-cloud-saga.ts @@ -0,0 +1,112 @@ +import type * as AgentTypes from "@posthog/agent/types"; +import { Saga, type SagaLogger } from "@posthog/shared"; +import type { HandoffBaseDeps, HandoffToCloudExecuteInput } from "./schemas"; + +export type HandoffToCloudSagaInput = HandoffToCloudExecuteInput; + +export interface HandoffToCloudSagaOutput { + checkpointCaptured: boolean; + snapshotCaptured: boolean; + flushedLogEntryCount: number; +} + +export interface HandoffToCloudSagaDeps extends HandoffBaseDeps { + captureGitCheckpoint( + localGitState?: AgentTypes.HandoffLocalGitState, + ): Promise; + captureTreeSnapshot(): Promise; + persistCheckpointToLog( + checkpoint: AgentTypes.GitCheckpointEvent, + ): Promise; + persistSnapshotToLog(snapshot: AgentTypes.TreeSnapshotEvent): Promise; + flushLocalLogs(): Promise; + resumeRunInCloud(): Promise; +} + +export class HandoffToCloudSaga extends Saga< + HandoffToCloudSagaInput, + HandoffToCloudSagaOutput +> { + readonly sagaName = "HandoffToCloudSaga"; + private deps: HandoffToCloudSagaDeps; + + constructor(deps: HandoffToCloudSagaDeps, logger?: SagaLogger) { + super(logger); + this.deps = deps; + } + + protected async execute( + input: HandoffToCloudSagaInput, + ): Promise { + const { taskId, runId } = input; + + let checkpointCaptured = false; + let snapshotCaptured = false; + + this.deps.onProgress( + "capturing_checkpoint", + "Capturing local git state...", + ); + + const checkpoint = await this.readOnlyStep("capture_git_checkpoint", () => + this.deps.captureGitCheckpoint(input.localGitState), + ); + + let persistedNotificationCount = 0; + + if (checkpoint) { + await this.readOnlyStep("persist_checkpoint_to_log", () => + this.deps.persistCheckpointToLog(checkpoint), + ); + checkpointCaptured = true; + persistedNotificationCount++; + } + + this.deps.onProgress("capturing_snapshot", "Capturing local file state..."); + + const snapshot = await this.readOnlyStep("capture_tree_snapshot", () => + this.deps.captureTreeSnapshot(), + ); + + if (snapshot) { + await this.readOnlyStep("persist_snapshot_to_log", () => + this.deps.persistSnapshotToLog(snapshot), + ); + snapshotCaptured = true; + persistedNotificationCount++; + } + + const localLogLineCount = await this.readOnlyStep("flush_local_logs", () => + this.deps.flushLocalLogs(), + ); + const flushedLogEntryCount = localLogLineCount + persistedNotificationCount; + + this.deps.onProgress("starting_cloud_run", "Starting cloud sandbox..."); + + await this.step({ + name: "start_cloud_run", + execute: () => this.deps.resumeRunInCloud(), + rollback: async () => {}, + }); + + this.deps.onProgress("stopping_agent", "Stopping local agent..."); + + await this.readOnlyStep("stop_local_agent", () => + this.deps.killSession(runId), + ); + + await this.step({ + name: "update_workspace", + execute: async () => { + this.deps.updateWorkspaceMode(taskId, "cloud"); + }, + rollback: async () => { + this.deps.updateWorkspaceMode(taskId, "local"); + }, + }); + + this.deps.onProgress("complete", "Handoff to cloud complete"); + + return { checkpointCaptured, snapshotCaptured, flushedLogEntryCount }; + } +} diff --git a/apps/code/src/main/services/handoff/schemas.ts b/apps/code/src/main/services/handoff/schemas.ts index a9c571cee..97ea85adc 100644 --- a/apps/code/src/main/services/handoff/schemas.ts +++ b/apps/code/src/main/services/handoff/schemas.ts @@ -1,14 +1,26 @@ +import type { PostHogAPIClient } from "@posthog/agent/posthog-api"; import { handoffLocalGitStateSchema } from "@posthog/agent/server/schemas"; import { z } from "zod"; +import type { WorkspaceMode } from "../../db/repositories/workspace-repository"; -export const handoffPreflightInput = z.object({ +const handoffBaseInput = z.object({ taskId: z.string(), runId: z.string(), repoPath: z.string(), +}); + +const handoffApiInput = handoffBaseInput.extend({ apiHost: z.string(), teamId: z.number(), }); +const handoffBaseResult = z.object({ + success: z.boolean(), + error: z.string().optional(), +}); + +export const handoffPreflightInput = handoffApiInput; + export type HandoffPreflightInput = z.infer; export const handoffPreflightResult = z.object({ @@ -20,12 +32,7 @@ export const handoffPreflightResult = z.object({ export type HandoffPreflightResult = z.infer; -export const handoffExecuteInput = z.object({ - taskId: z.string(), - runId: z.string(), - repoPath: z.string(), - apiHost: z.string(), - teamId: z.number(), +export const handoffExecuteInput = handoffApiInput.extend({ sessionId: z.string().optional(), adapter: z.enum(["claude", "codex"]).optional(), localGitState: handoffLocalGitStateSchema.optional(), @@ -33,20 +40,53 @@ export const handoffExecuteInput = z.object({ export type HandoffExecuteInput = z.infer; -export const handoffExecuteResult = z.object({ - success: z.boolean(), +export const handoffExecuteResult = handoffBaseResult.extend({ sessionId: z.string().optional(), - error: z.string().optional(), }); export type HandoffExecuteResult = z.infer; +export const handoffToCloudPreflightInput = handoffBaseInput; + +export type HandoffToCloudPreflightInput = z.infer< + typeof handoffToCloudPreflightInput +>; + +export const handoffToCloudPreflightResult = z.object({ + canHandoff: z.boolean(), + reason: z.string().optional(), + localGitState: handoffLocalGitStateSchema.optional(), +}); + +export type HandoffToCloudPreflightResult = z.infer< + typeof handoffToCloudPreflightResult +>; + +export const handoffToCloudExecuteInput = handoffApiInput.extend({ + localGitState: handoffLocalGitStateSchema.optional(), +}); + +export type HandoffToCloudExecuteInput = z.infer< + typeof handoffToCloudExecuteInput +>; + +export const handoffToCloudExecuteResult = handoffBaseResult.extend({ + logEntryCount: z.number().optional(), +}); + +export type HandoffToCloudExecuteResult = z.infer< + typeof handoffToCloudExecuteResult +>; + export type HandoffStep = | "fetching_logs" | "applying_git_checkpoint" | "applying_snapshot" - | "updating_run" | "spawning_agent" + | "capturing_checkpoint" + | "capturing_snapshot" + | "stopping_agent" + | "starting_cloud_run" | "complete" | "failed"; @@ -63,3 +103,10 @@ export const HandoffEvent = { export interface HandoffServiceEvents { [HandoffEvent.Progress]: HandoffProgressPayload; } + +export interface HandoffBaseDeps { + createApiClient(apiHost: string, teamId: number): PostHogAPIClient; + killSession(taskRunId: string): Promise; + updateWorkspaceMode(taskId: string, mode: WorkspaceMode): void; + onProgress(step: HandoffStep, message: string): void; +} diff --git a/apps/code/src/main/services/handoff/service.ts b/apps/code/src/main/services/handoff/service.ts index 46d253b84..93b96e190 100644 --- a/apps/code/src/main/services/handoff/service.ts +++ b/apps/code/src/main/services/handoff/service.ts @@ -1,8 +1,9 @@ -import { mkdirSync, writeFileSync } from "node:fs"; +import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; import { join } from "node:path"; import { MAIN_TOKENS } from "@main/di/tokens"; import { logger } from "@main/utils/logger"; import { TypedEventEmitter } from "@main/utils/typed-event-emitter"; +import { POSTHOG_NOTIFICATIONS } from "@posthog/agent"; import { HandoffCheckpointTracker } from "@posthog/agent/handoff-checkpoint"; import { PostHogAPIClient } from "@posthog/agent/posthog-api"; import { TreeTracker } from "@posthog/agent/tree-tracker"; @@ -19,6 +20,10 @@ import type { AgentService } from "../agent/service"; import type { CloudTaskService } from "../cloud-task/service"; import type { GitService } from "../git/service"; import { HandoffSaga, type HandoffSagaDeps } from "./handoff-saga"; +import { + HandoffToCloudSaga, + type HandoffToCloudSagaDeps, +} from "./handoff-to-cloud-saga"; import { HandoffEvent, type HandoffExecuteInput, @@ -26,6 +31,10 @@ import { type HandoffPreflightInput, type HandoffPreflightResult, type HandoffServiceEvents, + type HandoffToCloudExecuteInput, + type HandoffToCloudExecuteResult, + type HandoffToCloudPreflightInput, + type HandoffToCloudPreflightResult, } from "./schemas"; const log = logger.scope("handoff"); @@ -72,13 +81,8 @@ export class HandoffService extends TypedEventEmitter { async execute(input: HandoffExecuteInput): Promise { const deps: HandoffSagaDeps = { - createApiClient: (apiHost: string, teamId: number) => { - const config = this.agentAuthAdapter.createPosthogConfig({ - apiHost, - projectId: teamId, - }); - return new PostHogAPIClient(config); - }, + createApiClient: (apiHost, teamId) => + this.createApiClient(apiHost, teamId), applyTreeSnapshot: async ( snapshot: AgentTypes.TreeSnapshotEvent, @@ -158,7 +162,8 @@ export class HandoffService extends TypedEventEmitter { runId, ); mkdirSync(logDir, { recursive: true }); - writeFileSync(join(logDir, "logs.ndjson"), content); + const marker = JSON.stringify({ type: "seed_boundary" }); + writeFileSync(join(logDir, "logs.ndjson"), `${content}\n${marker}`); log.info("Seeded local logs from cloud", { runId, bytes: content.length, @@ -207,6 +212,176 @@ export class HandoffService extends TypedEventEmitter { }; } + async preflightToCloud( + input: HandoffToCloudPreflightInput, + ): Promise { + const { repoPath } = input; + + let localGitState: AgentTypes.HandoffLocalGitState | undefined; + try { + localGitState = await this.getLocalGitState(repoPath); + } catch (err) { + log.warn("Failed to read local git state for cloud handoff", { + repoPath, + err, + }); + } + + return { canHandoff: true, localGitState }; + } + + async executeToCloud( + input: HandoffToCloudExecuteInput, + ): Promise { + const { taskId, runId, repoPath, apiHost, teamId } = input; + const apiClient = this.createApiClient(apiHost, teamId); + + const checkpointTracker = new HandoffCheckpointTracker({ + repositoryPath: repoPath, + taskId, + runId, + apiClient, + }); + + const treeTracker = new TreeTracker({ + repositoryPath: repoPath, + taskId, + runId, + apiClient, + }); + + const appendNotification = async ( + method: string, + params: Record, + ) => { + await apiClient.appendTaskRunLog(taskId, runId, [ + { + type: "notification", + timestamp: new Date().toISOString(), + notification: { jsonrpc: "2.0", method, params }, + }, + ]); + }; + + const deps: HandoffToCloudSagaDeps = { + createApiClient: () => apiClient, + + captureGitCheckpoint: async (localGitState) => { + const checkpoint = + await checkpointTracker.captureForHandoff(localGitState); + if (!checkpoint) return null; + return { ...checkpoint, device: { type: "local" as const } }; + }, + + captureTreeSnapshot: async () => { + const snapshot = await treeTracker.captureTree({}); + if (!snapshot) return null; + return { ...snapshot, device: { type: "local" as const } }; + }, + + persistCheckpointToLog: (checkpoint) => + appendNotification( + POSTHOG_NOTIFICATIONS.GIT_CHECKPOINT, + checkpoint as unknown as Record, + ), + + persistSnapshotToLog: (snapshot) => + appendNotification( + POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT, + snapshot as unknown as Record, + ), + + flushLocalLogs: async () => { + const logPath = join( + app.getPath("home"), + ".posthog-code", + "sessions", + runId, + "logs.ndjson", + ); + if (!existsSync(logPath)) return 0; + + const lines = readFileSync(logPath, "utf-8") + .split("\n") + .filter((l) => l.trim()); + if (lines.length === 0) return 0; + + const boundaryIndex = lines.findIndex((l) => { + try { + return JSON.parse(l).type === "seed_boundary"; + } catch { + return false; + } + }); + const newLines = + boundaryIndex >= 0 ? lines.slice(boundaryIndex + 1) : lines; + + const entries: AgentTypes.StoredEntry[] = []; + for (const line of newLines) { + try { + entries.push(JSON.parse(line)); + } catch { + // skip + } + } + + if (entries.length > 0) { + await apiClient.appendTaskRunLog(taskId, runId, entries); + log.info("Flushed local logs to cloud", { + runId, + entries: entries.length, + }); + } + + return lines.length; + }, + + resumeRunInCloud: async () => { + await apiClient.resumeRunInCloud(taskId, runId); + }, + + killSession: async (taskRunId) => { + await this.agentService.cancelSession(taskRunId); + }, + + updateWorkspaceMode: (tid, mode) => { + this.workspaceRepo.updateMode(tid, mode); + }, + + onProgress: (step, message) => { + this.emit(HandoffEvent.Progress, { taskId, step, message }); + }, + }; + + const saga = new HandoffToCloudSaga(deps, log); + const result = await saga.run(input); + + if (!result.success) { + log.error("Handoff to cloud saga failed", { + error: result.error, + failedStep: result.failedStep, + }); + deps.onProgress("failed", result.error ?? "Handoff to cloud failed"); + return { + success: false, + error: `Handoff to cloud failed at step '${result.failedStep}': ${result.error}`, + }; + } + + return { + success: true, + logEntryCount: result.data.flushedLogEntryCount, + }; + } + + private createApiClient(apiHost: string, teamId: number): PostHogAPIClient { + const config = this.agentAuthAdapter.createPosthogConfig({ + apiHost, + projectId: teamId, + }); + return new PostHogAPIClient(config); + } + private async getLocalGitState( repoPath: string, ): Promise { diff --git a/apps/code/src/main/trpc/routers/handoff.ts b/apps/code/src/main/trpc/routers/handoff.ts index 2b3ef2c2a..d231a70df 100644 --- a/apps/code/src/main/trpc/routers/handoff.ts +++ b/apps/code/src/main/trpc/routers/handoff.ts @@ -7,6 +7,10 @@ import { handoffExecuteResult, handoffPreflightInput, handoffPreflightResult, + handoffToCloudExecuteInput, + handoffToCloudExecuteResult, + handoffToCloudPreflightInput, + handoffToCloudPreflightResult, } from "../../services/handoff/schemas"; import type { HandoffService } from "../../services/handoff/service"; import { publicProcedure, router } from "../trpc"; @@ -25,6 +29,16 @@ export const handoffRouter = router({ .output(handoffExecuteResult) .mutation(({ input }) => getService().execute(input)), + preflightToCloud: publicProcedure + .input(handoffToCloudPreflightInput) + .output(handoffToCloudPreflightResult) + .query(({ input }) => getService().preflightToCloud(input)), + + executeToCloud: publicProcedure + .input(handoffToCloudExecuteInput) + .output(handoffToCloudExecuteResult) + .mutation(({ input }) => getService().executeToCloud(input)), + onProgress: publicProcedure .input(z.object({ taskId: z.string() })) .subscription(async function* (opts) { diff --git a/apps/code/src/renderer/api/posthogClient.ts b/apps/code/src/renderer/api/posthogClient.ts index cde296d3c..d83f8392d 100644 --- a/apps/code/src/renderer/api/posthogClient.ts +++ b/apps/code/src/renderer/api/posthogClient.ts @@ -785,6 +785,24 @@ export class PostHogAPIClient { return data as unknown as Task; } + async resumeRunInCloud(taskId: string, runId: string): Promise { + const teamId = await this.getTeamId(); + const url = new URL( + `${this.api.baseUrl}/api/projects/${teamId}/tasks/${taskId}/runs/${runId}/resume_in_cloud/`, + ); + const response = await this.api.fetcher.fetch({ + method: "post", + url, + path: `/api/projects/${teamId}/tasks/${taskId}/runs/${runId}/resume_in_cloud/`, + }); + + if (!response.ok) { + throw new Error(`Failed to resume run in cloud: ${response.statusText}`); + } + + return (await response.json()) as TaskRun; + } + async listTaskRuns(taskId: string): Promise { const teamId = await this.getTeamId(); const url = new URL( diff --git a/apps/code/src/renderer/components/HeaderRow.tsx b/apps/code/src/renderer/components/HeaderRow.tsx index 8623cf5d4..8a27cde0a 100644 --- a/apps/code/src/renderer/components/HeaderRow.tsx +++ b/apps/code/src/renderer/components/HeaderRow.tsx @@ -1,14 +1,46 @@ +import { useAuthStateValue } from "@features/auth/hooks/authQueries"; import { DiffStatsBadge } from "@features/code-review/components/DiffStatsBadge"; import { CloudGitInteractionHeader } from "@features/git-interaction/components/CloudGitInteractionHeader"; import { GitInteractionHeader } from "@features/git-interaction/components/GitInteractionHeader"; +import { useSessionForTask } from "@features/sessions/hooks/useSession"; +import { useSessionCallbacks } from "@features/sessions/hooks/useSessionCallbacks"; import { SidebarTrigger } from "@features/sidebar/components/SidebarTrigger"; import { useSidebarStore } from "@features/sidebar/stores/sidebarStore"; import { useWorkspace } from "@features/workspace/hooks/useWorkspace"; -import { Box, Flex } from "@radix-ui/themes"; +import { Box, Button, Flex, Text } from "@radix-ui/themes"; +import type { Task } from "@shared/types"; import { useHeaderStore } from "@stores/headerStore"; import { useNavigationStore } from "@stores/navigationStore"; import { isWindows } from "@utils/platform"; +function LocalHandoffButton({ taskId, task }: { taskId: string; task: Task }) { + const session = useSessionForTask(taskId); + const workspace = useWorkspace(taskId); + const repoPath = workspace?.folderPath ?? null; + const authStatus = useAuthStateValue((s) => s.status); + const { handleContinueInCloud } = useSessionCallbacks({ + taskId, + task, + session: session ?? undefined, + repoPath, + }); + + if (authStatus !== "authenticated") return null; + + return ( + + ); +} + export const HEADER_HEIGHT = 36; const COLLAPSED_WIDTH = 110; /** Width reserved for Windows title bar buttons (Close/Minimize/Maximize) */ @@ -100,16 +132,19 @@ export function HeaderRow() { flexShrink: 0, }} > -
+ {isCloudTask ? ( ) : ( - + <> + + + )} -
+ )} diff --git a/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts b/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts index b701b5e3a..f61ad7286 100644 --- a/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts +++ b/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts @@ -204,6 +204,17 @@ export function useSessionCallbacks({ } }, [taskId, task.repository]); + const handleContinueInCloud = useCallback(async () => { + if (!repoPath) return; + try { + await getSessionService().handoffToCloud(taskId, repoPath); + } catch (error) { + log.error("Failed to hand off to cloud", error); + const message = error instanceof Error ? error.message : "Unknown error"; + toast.error(`Failed to continue in cloud: ${message}`); + } + }, [taskId, repoPath]); + return { handleSendPrompt, handleCancelPrompt, @@ -211,5 +222,6 @@ export function useSessionCallbacks({ handleNewSession, handleBashCommand, handleContinueLocally, + handleContinueInCloud, }; } diff --git a/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts b/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts index b41c2e5be..75d79dfb5 100644 --- a/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts +++ b/apps/code/src/renderer/features/sessions/hooks/useSessionConnection.ts @@ -99,7 +99,7 @@ export function useSessionConnection({ if (!repoPath) return; if (connectingTasks.has(taskId)) return; if (!isOnline) return; - if (isCloud) return; + if (isCloud || session?.isCloud) return; if (isSuspended) return; if ( diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 6da2966ec..ec4fe0048 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -1212,6 +1212,18 @@ export class SessionService { return this.resumeCloudRun(session, rawPromptText); } + if (session.cloudStatus !== "in_progress") { + sessionStoreSetters.enqueueMessage(session.taskId, rawPromptText); + sessionStoreSetters.updateSession(session.taskRunId, { + isPromptPending: true, + }); + log.info("Cloud message queued (sandbox not ready)", { + taskId: session.taskId, + cloudStatus: session.cloudStatus, + }); + return { stopReason: "queued" }; + } + if (!options?.skipQueueGuard && session.isPromptPending) { sessionStoreSetters.enqueueMessage(session.taskId, rawPromptText); log.info("Cloud message queued", { @@ -2149,6 +2161,76 @@ export class SessionService { } } + async handoffToCloud(taskId: string, repoPath: string): Promise { + const session = sessionStoreSetters.getSessionByTaskId(taskId); + if (!session) { + log.warn("No session found for cloud handoff", { taskId }); + return; + } + + const runId = session.taskRunId; + const auth = await this.getHandoffAuth(); + if (!auth) return; + + sessionStoreSetters.updateSession(runId, { handoffInProgress: true }); + + try { + const preflight = await trpcClient.handoff.preflightToCloud.query({ + taskId, + runId, + repoPath, + }); + if (!preflight.canHandoff) { + sessionStoreSetters.updateSession(runId, { + handoffInProgress: false, + }); + throw new Error(preflight.reason ?? "Cannot hand off to cloud"); + } + + this.unsubscribeFromChannel(runId); + sessionStoreSetters.updateSession(runId, { status: "connecting" }); + + const result = await trpcClient.handoff.executeToCloud.mutate({ + taskId, + runId, + repoPath, + apiHost: auth.apiHost, + teamId: auth.projectId, + localGitState: preflight.localGitState, + }); + if (!result.success) { + throw new Error(result.error ?? "Handoff to cloud failed"); + } + + sessionStoreSetters.updateSession(runId, { + isCloud: true, + cloudStatus: undefined, + cloudStage: undefined, + cloudOutput: undefined, + cloudErrorMessage: undefined, + cloudBranch: undefined, + handoffInProgress: false, + status: "disconnected", + processedLineCount: result.logEntryCount ?? 0, + }); + + this.watchCloudTask(taskId, runId, auth.apiHost, auth.projectId); + queryClient.invalidateQueries({ queryKey: ["tasks"] }); + queryClient.invalidateQueries(trpc.workspace.getAll.pathFilter()); + log.info("Local-to-cloud handoff complete", { taskId, runId }); + } catch (err) { + log.error("Handoff to cloud failed", { taskId, err }); + toast.error( + err instanceof Error ? err.message : "Handoff to cloud failed", + ); + this.subscribeToChannel(runId); + sessionStoreSetters.updateSession(runId, { + handoffInProgress: false, + status: "disconnected", + }); + } + } + private async getHandoffAuth(): Promise<{ apiHost: string; projectId: number; diff --git a/packages/agent/src/handoff-checkpoint.ts b/packages/agent/src/handoff-checkpoint.ts index 03ea9ad0c..818535a33 100644 --- a/packages/agent/src/handoff-checkpoint.ts +++ b/packages/agent/src/handoff-checkpoint.ts @@ -113,7 +113,7 @@ export class HandoffCheckpointTracker { divergence: GitHandoffBranchDivergence, ) => Promise; }, - ): Promise { + ): Promise<{ packBytes: number; indexBytes: number; totalBytes: number }> { if (!this.apiClient) { throw new Error( "Cannot apply handoff checkpoint: API client not configured", @@ -152,6 +152,12 @@ export class HandoffCheckpointTracker { }); this.logApplyMetrics(checkpoint, downloads, applyResult.totalBytes); + + return { + packBytes: downloads.pack?.rawBytes ?? 0, + indexBytes: downloads.index?.rawBytes ?? 0, + totalBytes: applyResult.totalBytes, + }; } finally { await this.removeIfPresent(packPath); await this.removeIfPresent(indexPath); @@ -207,23 +213,24 @@ export class HandoffCheckpointTracker { } private async uploadArtifacts(specs: UploadArtifactSpec[]): Promise { - const uploads = await Promise.all( - specs.map(async (spec) => { - if (!spec.filePath) { - return [spec.key, undefined] as const; - } - return [ - spec.key, - await this.uploadArtifactFile( - spec.filePath, - spec.name, - spec.contentType, - ), - ] as const; - }), - ); + const results: Array = + []; + for (const spec of specs) { + if (!spec.filePath) { + results.push([spec.key, undefined] as const); + continue; + } + results.push([ + spec.key, + await this.uploadArtifactFile( + spec.filePath, + spec.name, + spec.contentType, + ), + ] as const); + } - return Object.fromEntries(uploads) as Uploads; + return Object.fromEntries(results) as Uploads; } private async downloadArtifactToFile( @@ -240,10 +247,6 @@ export class HandoffCheckpointTracker { this.runId, artifactPath, ); - if (!arrayBuffer) { - throw new Error(`Failed to download ${label}`); - } - const base64Content = Buffer.from(arrayBuffer).toString("utf-8"); const binaryContent = Buffer.from(base64Content, "base64"); await writeFile(filePath, binaryContent); diff --git a/packages/agent/src/posthog-api.ts b/packages/agent/src/posthog-api.ts index f50578e87..b6fedb1b9 100644 --- a/packages/agent/src/posthog-api.ts +++ b/packages/agent/src/posthog-api.ts @@ -143,6 +143,14 @@ export class PostHogAPIClient { ); } + async resumeRunInCloud(taskId: string, runId: string): Promise { + const teamId = this.getTeamId(); + return this.apiRequest( + `/api/projects/${teamId}/tasks/${taskId}/runs/${runId}/resume_in_cloud/`, + { method: "POST" }, + ); + } + async updateTaskRun( taskId: string, runId: string, @@ -217,7 +225,7 @@ export class PostHogAPIClient { taskId: string, runId: string, storagePath: string, - ): Promise { + ): Promise { const teamId = this.getTeamId(); try { const response = await this.apiRequest<{ @@ -231,34 +239,26 @@ export class PostHogAPIClient { }, ); return response.url; - } catch { - return null; + } catch (error) { + throw new Error( + `Presign failed for ${storagePath}: ${error instanceof Error ? error.message : String(error)}`, + ); } } - /** - * Download artifact content by storage path - * Gets a presigned URL and fetches the content - */ async downloadArtifact( taskId: string, runId: string, storagePath: string, - ): Promise { + ): Promise { const url = await this.getArtifactPresignedUrl(taskId, runId, storagePath); - if (!url) { - return null; - } - - try { - const response = await fetch(url); - if (!response.ok) { - throw new Error(`Failed to download artifact: ${response.status}`); - } - return response.arrayBuffer(); - } catch { - return null; + const response = await fetch(url); + if (!response.ok) { + throw new Error( + `Artifact fetch failed for ${storagePath}: HTTP ${response.status}`, + ); } + return response.arrayBuffer(); } /** diff --git a/packages/agent/src/resume.ts b/packages/agent/src/resume.ts index 84dcf5103..dd7bd1bc8 100644 --- a/packages/agent/src/resume.ts +++ b/packages/agent/src/resume.ts @@ -30,8 +30,6 @@ export interface ResumeState { conversation: ConversationTurn[]; latestSnapshot: TreeSnapshotEvent | null; latestGitCheckpoint: GitCheckpointEvent | null; - /** Whether the tree snapshot was successfully applied (files restored) */ - snapshotApplied: boolean; interrupted: boolean; lastDevice?: DeviceInfo; logEntryCount: number; @@ -61,11 +59,7 @@ export interface ResumeConfig { /** * Resume a task from its persisted log. * Returns the rebuilt state for the agent to continue from. - * - * Uses Saga pattern internally for atomic operations. - * Note: snapshotApplied field indicates if files were actually restored - - * even if latestSnapshot is non-null, files may not have been restored if - * the snapshot had no archive URL or download/extraction failed. + * Snapshot and checkpoint application happens in the agent server after SSE connects. */ export async function resumeFromLog( config: ResumeConfig, @@ -102,7 +96,6 @@ export async function resumeFromLog( conversation: result.data.conversation as ConversationTurn[], latestSnapshot: result.data.latestSnapshot, latestGitCheckpoint: result.data.latestGitCheckpoint, - snapshotApplied: result.data.snapshotApplied, interrupted: result.data.interrupted, lastDevice: result.data.lastDevice, logEntryCount: result.data.logEntryCount, @@ -124,15 +117,31 @@ export function conversationToPromptHistory( const RESUME_HISTORY_TOKEN_BUDGET = 50_000; const TOOL_RESULT_MAX_CHARS = 2000; +const RESUME_CONTEXT_MARKERS = [ + "You are resuming a previous conversation", + "Here is the conversation history from the", + "Continue from where you left off", +]; + +function isResumeContextTurn(turn: ConversationTurn): boolean { + if (turn.role !== "user") return false; + const text = turn.content + .filter((b) => b.type === "text") + .map((b) => (b as { type: "text"; text: string }).text) + .join(""); + return RESUME_CONTEXT_MARKERS.some((marker) => text.includes(marker)); +} + export function formatConversationForResume( conversation: ConversationTurn[], ): string { - const selected = selectRecentTurns(conversation, RESUME_HISTORY_TOKEN_BUDGET); + const filtered = conversation.filter((turn) => !isResumeContextTurn(turn)); + const selected = selectRecentTurns(filtered, RESUME_HISTORY_TOKEN_BUDGET); const parts: string[] = []; - if (selected.length < conversation.length) { + if (selected.length < filtered.length) { parts.push( - `*(${conversation.length - selected.length} earlier turns omitted)*`, + `*(${filtered.length - selected.length} earlier turns omitted)*`, ); } diff --git a/packages/agent/src/sagas/resume-saga.test.ts b/packages/agent/src/sagas/resume-saga.test.ts index b924adbb3..eaabc25d3 100644 --- a/packages/agent/src/sagas/resume-saga.test.ts +++ b/packages/agent/src/sagas/resume-saga.test.ts @@ -52,7 +52,6 @@ describe("ResumeSaga", () => { if (result.success) { expect(result.data.conversation).toHaveLength(0); expect(result.data.latestSnapshot).toBeNull(); - expect(result.data.snapshotApplied).toBe(false); expect(result.data.logEntryCount).toBe(0); } }); @@ -505,8 +504,6 @@ describe("ResumeSaga", () => { expect(result.success).toBe(true); if (!result.success) return; - expect(result.data.snapshotApplied).toBe(true); - const content = await repo.readFile("restored.ts"); expect(content).toBe("restored content"); }); @@ -533,7 +530,6 @@ describe("ResumeSaga", () => { expect(result.success).toBe(true); if (!result.success) return; - expect(result.data.snapshotApplied).toBe(false); expect(result.data.latestSnapshot).not.toBeNull(); expect(result.data.conversation).toHaveLength(1); }); @@ -563,7 +559,6 @@ describe("ResumeSaga", () => { expect(result.success).toBe(true); if (!result.success) return; - expect(result.data.snapshotApplied).toBe(false); expect(result.data.conversation).toHaveLength(1); expect(mockLogger.warn).toHaveBeenCalled(); }); diff --git a/packages/agent/src/sagas/resume-saga.ts b/packages/agent/src/sagas/resume-saga.ts index d8658b19e..8810675fc 100644 --- a/packages/agent/src/sagas/resume-saga.ts +++ b/packages/agent/src/sagas/resume-saga.ts @@ -2,14 +2,13 @@ import type { ContentBlock } from "@agentclientprotocol/sdk"; import { Saga } from "@posthog/shared"; import { isNotification, POSTHOG_NOTIFICATIONS } from "../acp-extensions"; import type { PostHogAPIClient } from "../posthog-api"; -import { TreeTracker } from "../tree-tracker"; import type { DeviceInfo, GitCheckpointEvent, StoredNotification, TreeSnapshotEvent, } from "../types"; -import { Logger } from "../utils/logger"; +import type { Logger } from "../utils/logger"; export interface ConversationTurn { role: "user" | "assistant"; @@ -36,7 +35,6 @@ export interface ResumeOutput { conversation: ConversationTurn[]; latestSnapshot: TreeSnapshotEvent | null; latestGitCheckpoint: GitCheckpointEvent | null; - snapshotApplied: boolean; interrupted: boolean; lastDevice?: DeviceInfo; logEntryCount: number; @@ -46,9 +44,7 @@ export class ResumeSaga extends Saga { readonly sagaName = "ResumeSaga"; protected async execute(input: ResumeInput): Promise { - const { taskId, runId, repositoryPath, apiClient } = input; - const logger = - input.logger || new Logger({ debug: false, prefix: "[Resume]" }); + const { taskId, runId, apiClient } = input; // Step 1: Fetch task run (read-only) const taskRun = await this.readOnlyStep("fetch_task_run", () => @@ -83,69 +79,21 @@ export class ResumeSaga extends Saga { ); // Step 4: Apply snapshot if present (wrapped in step for consistent logging) - // Note: We use a try/catch inside the step because snapshot failure should NOT fail the saga - let snapshotApplied = false; - if (latestSnapshot?.archiveUrl && repositoryPath) { + if (latestSnapshot) { this.log.info("Found tree snapshot", { treeHash: latestSnapshot.treeHash, - hasArchiveUrl: true, + hasArchiveUrl: !!latestSnapshot.archiveUrl, changes: latestSnapshot.changes?.length ?? 0, - interrupted: latestSnapshot.interrupted, }); + } - await this.step({ - name: "apply_snapshot", - execute: async () => { - const treeTracker = new TreeTracker({ - repositoryPath, - taskId, - runId, - apiClient, - logger: logger.child("TreeTracker"), - }); - - try { - await treeTracker.applyTreeSnapshot(latestSnapshot); - treeTracker.setLastTreeHash(latestSnapshot.treeHash); - snapshotApplied = true; - this.log.info("Tree snapshot applied successfully", { - treeHash: latestSnapshot.treeHash, - }); - } catch (error) { - // Log but don't fail - continue with conversation rebuild - // ApplySnapshotSaga handles its own rollback internally - this.log.warn( - "Failed to apply tree snapshot, continuing without it", - { - error: error instanceof Error ? error.message : String(error), - treeHash: latestSnapshot.treeHash, - }, - ); - } - }, - rollback: async () => { - // Inner ApplySnapshotSaga handles its own rollback - }, + if (latestGitCheckpoint) { + this.log.info("Found git checkpoint", { + checkpointId: latestGitCheckpoint.checkpointId, + branch: latestGitCheckpoint.branch, }); - } else if (latestSnapshot?.archiveUrl && !repositoryPath) { - this.log.warn( - "Snapshot found but no repositoryPath configured - files cannot be restored", - { - treeHash: latestSnapshot.treeHash, - changes: latestSnapshot.changes?.length ?? 0, - }, - ); - } else if (latestSnapshot) { - this.log.warn( - "Snapshot found but has no archive URL - files cannot be restored", - { - treeHash: latestSnapshot.treeHash, - changes: latestSnapshot.changes?.length ?? 0, - }, - ); } - // Step 5: Rebuild conversation (read-only, pure computation) const conversation = await this.readOnlyStep("rebuild_conversation", () => Promise.resolve(this.rebuildConversation(entries)), ); @@ -158,7 +106,7 @@ export class ResumeSaga extends Saga { this.log.info("Resume state rebuilt", { turns: conversation.length, hasSnapshot: !!latestSnapshot, - snapshotApplied, + hasGitCheckpoint: !!latestGitCheckpoint, interrupted: latestSnapshot?.interrupted ?? false, }); @@ -166,7 +114,6 @@ export class ResumeSaga extends Saga { conversation, latestSnapshot, latestGitCheckpoint, - snapshotApplied, interrupted: latestSnapshot?.interrupted ?? false, lastDevice, logEntryCount: entries.length, @@ -178,7 +125,6 @@ export class ResumeSaga extends Saga { conversation: [], latestSnapshot: null, latestGitCheckpoint: null, - snapshotApplied: false, interrupted: false, logEntryCount: 0, }; diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index 3258e89cc..b6e3f39c1 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -412,32 +412,9 @@ export class AgentServer { this.logger.info("Auto-initializing session", { taskId, runId, mode }); - // Check if this is a resume from a previous run const resumeRunId = process.env.POSTHOG_RESUME_RUN_ID; if (resumeRunId) { - this.logger.info("Resuming from previous run", { - resumeRunId, - currentRunId: runId, - }); - try { - this.resumeState = await resumeFromLog({ - taskId, - runId: resumeRunId, - repositoryPath: this.config.repositoryPath, - apiClient: this.posthogAPI, - logger: new Logger({ debug: true, prefix: "[Resume]" }), - }); - this.logger.info("Resume state loaded", { - conversationTurns: this.resumeState.conversation.length, - snapshotApplied: this.resumeState.snapshotApplied, - logEntries: this.resumeState.logEntryCount, - }); - } catch (error) { - this.logger.warn("Failed to load resume state, starting fresh", { - error, - }); - this.resumeState = null; - } + await this.loadResumeState(taskId, resumeRunId, runId); } // Create a synthetic payload from config (no JWT needed for auto-init) @@ -822,33 +799,14 @@ export class AgentServer { } } - // Check for resume if not already loaded from env var in autoInitializeSession if (!this.resumeState) { const resumeRunId = this.getResumeRunId(taskRun); if (resumeRunId) { - this.logger.info("Resuming from previous run (via TaskRun state)", { + await this.loadResumeState( + payload.task_id, resumeRunId, - currentRunId: payload.run_id, - }); - try { - this.resumeState = await resumeFromLog({ - taskId: payload.task_id, - runId: resumeRunId, - repositoryPath: this.config.repositoryPath, - apiClient: this.posthogAPI, - logger: new Logger({ debug: true, prefix: "[Resume]" }), - }); - this.logger.info("Resume state loaded (via TaskRun state)", { - conversationTurns: this.resumeState.conversation.length, - snapshotApplied: this.resumeState.snapshotApplied, - logEntries: this.resumeState.logEntryCount, - }); - } catch (error) { - this.logger.warn("Failed to load resume state, starting fresh", { - error, - }); - this.resumeState = null; - } + payload.run_id, + ); } } @@ -926,11 +884,70 @@ export class AgentServer { this.resumeState.conversation, ); - // Read the pending user prompt from TaskRun state (set by the workflow - // when the user sends a follow-up message that triggers a resume). + let snapshotApplied = false; + if ( + this.resumeState.latestSnapshot?.archiveUrl && + this.config.repositoryPath && + this.posthogAPI + ) { + try { + const treeTracker = new TreeTracker({ + repositoryPath: this.config.repositoryPath, + taskId: payload.task_id, + runId: payload.run_id, + apiClient: this.posthogAPI, + logger: this.logger.child("TreeTracker"), + }); + await treeTracker.applyTreeSnapshot(this.resumeState.latestSnapshot); + treeTracker.setLastTreeHash(this.resumeState.latestSnapshot.treeHash); + snapshotApplied = true; + this.logger.info("Tree snapshot applied", { + treeHash: this.resumeState.latestSnapshot.treeHash, + changes: this.resumeState.latestSnapshot.changes?.length ?? 0, + hasArchiveUrl: !!this.resumeState.latestSnapshot.archiveUrl, + }); + } catch (error) { + this.logger.warn("Failed to apply tree snapshot", { + error: error instanceof Error ? error.message : String(error), + treeHash: this.resumeState.latestSnapshot.treeHash, + }); + } + } + + if ( + this.resumeState.latestGitCheckpoint && + this.config.repositoryPath && + this.posthogAPI + ) { + try { + const checkpointTracker = new HandoffCheckpointTracker({ + repositoryPath: this.config.repositoryPath, + taskId: payload.task_id, + runId: payload.run_id, + apiClient: this.posthogAPI, + logger: this.logger.child("HandoffCheckpoint"), + }); + const metrics = await checkpointTracker.applyFromHandoff( + this.resumeState.latestGitCheckpoint, + ); + this.logger.info("Git checkpoint applied", { + branch: this.resumeState.latestGitCheckpoint.branch, + head: this.resumeState.latestGitCheckpoint.head, + packBytes: metrics.packBytes, + indexBytes: metrics.indexBytes, + totalBytes: metrics.totalBytes, + }); + } catch (error) { + this.logger.warn("Failed to apply git checkpoint", { + error: error instanceof Error ? error.message : String(error), + branch: this.resumeState.latestGitCheckpoint.branch, + }); + } + } + const pendingUserPrompt = this.getPendingUserPrompt(taskRun); - const sandboxContext = this.resumeState.snapshotApplied + const sandboxContext = snapshotApplied ? `The workspace environment (all files, packages, and code changes) has been fully restored from where you left off.` : `The workspace files from the previous session were not restored (the file snapshot may have expired), so you are starting with a fresh environment. Your conversation history is fully preserved below.`; @@ -969,7 +986,10 @@ export class AgentServer { conversationTurns: this.resumeState.conversation.length, promptLength: promptBlocksToText(resumePromptBlocks).length, hasPendingUserMessage: !!pendingUserPrompt?.length, - snapshotApplied: this.resumeState.snapshotApplied, + snapshotApplied, + hasGitCheckpoint: !!this.resumeState.latestGitCheckpoint, + gitCheckpointBranch: + this.resumeState.latestGitCheckpoint?.branch ?? null, }); // Clear resume state so it's not reused @@ -1027,6 +1047,36 @@ export class AgentServer { return prompt.length > 0 ? prompt : null; } + private async loadResumeState( + taskId: string, + resumeRunId: string, + currentRunId: string, + ): Promise { + this.logger.info("Loading resume state", { resumeRunId, currentRunId }); + try { + this.resumeState = await resumeFromLog({ + taskId, + runId: resumeRunId, + repositoryPath: this.config.repositoryPath, + apiClient: this.posthogAPI, + logger: new Logger({ debug: true, prefix: "[Resume]" }), + }); + this.logger.info("Resume state loaded", { + conversationTurns: this.resumeState.conversation.length, + hasSnapshot: !!this.resumeState.latestSnapshot, + hasGitCheckpoint: !!this.resumeState.latestGitCheckpoint, + gitCheckpointBranch: + this.resumeState.latestGitCheckpoint?.branch ?? null, + logEntries: this.resumeState.logEntryCount, + }); + } catch (error) { + this.logger.warn("Failed to load resume state, starting fresh", { + error, + }); + this.resumeState = null; + } + } + private getResumeRunId(taskRun: TaskRun | null): string | null { // Env var takes precedence (set by backend infra) const envRunId = process.env.POSTHOG_RESUME_RUN_ID;