Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 61 additions & 76 deletions apps/code/src/main/services/git/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,12 @@ function toUnifiedDiffPatch(
@injectable()
export class GitService extends TypedEventEmitter<GitServiceEvents> {
private lastFetchTime = new Map<string, number>();
private llmGateway: LlmGatewayService;

constructor(
@inject(MAIN_TOKENS.LlmGatewayService) llmGateway: LlmGatewayService,
@inject(MAIN_TOKENS.LlmGatewayService)
private readonly llmGateway: LlmGatewayService,
) {
super();
this.llmGateway = llmGateway;
}

private async getStateSnapshot(
Expand Down Expand Up @@ -1086,85 +1085,71 @@ export class GitService extends TypedEventEmitter<GitServiceEvents> {

const [owner, repoName] = parts;

try {
const repoResult = await execGh([
"api",
`repos/${owner}/${repoName}`,
"--jq",
".default_branch",
]);
const repoResult = await execGh([
"api",
`repos/${owner}/${repoName}`,
"--jq",
".default_branch",
]);

if (repoResult.exitCode !== 0 || !repoResult.stdout.trim()) {
return [];
}
const defaultBranch = repoResult.stdout.trim();
if (repoResult.exitCode !== 0 || !repoResult.stdout.trim()) {
return [];
}
const defaultBranch = repoResult.stdout.trim();

const result = await execGh([
"api",
`repos/${owner}/${repoName}/compare/${defaultBranch}...${branch}`,
]);
const result = await execGh([
"api",
`repos/${owner}/${repoName}/compare/${defaultBranch}...${branch}`,
]);

if (result.exitCode !== 0) {
throw new Error(
`Failed to fetch branch files: ${result.stderr || result.error || "Unknown error"}`,
);
if (result.exitCode !== 0) {
throw new Error(
`Failed to fetch branch files: ${result.stderr || result.error || "Unknown error"}`,
);
}

const response = JSON.parse(result.stdout) as {
files?: Array<{
filename: string;
status: string;
previous_filename?: string;
additions: number;
deletions: number;
patch?: string;
}>;
};
const files = response.files;

if (!files) return [];

return files.map((f) => {
let status: ChangedFile["status"];
switch (f.status) {
case "added":
status = "added";
break;
case "removed":
status = "deleted";
break;
case "renamed":
status = "renamed";
break;
default:
status = "modified";
break;
}

const response = JSON.parse(result.stdout) as {
files?: Array<{
filename: string;
status: string;
previous_filename?: string;
additions: number;
deletions: number;
patch?: string;
}>;
return {
path: f.filename,
status,
originalPath: f.previous_filename,
linesAdded: f.additions,
linesRemoved: f.deletions,
patch: f.patch
? toUnifiedDiffPatch(f.patch, f.filename, f.previous_filename, status)
: undefined,
};
const files = response.files;

if (!files) return [];

return files.map((f) => {
let status: ChangedFile["status"];
switch (f.status) {
case "added":
status = "added";
break;
case "removed":
status = "deleted";
break;
case "renamed":
status = "renamed";
break;
default:
status = "modified";
break;
}

return {
path: f.filename,
status,
originalPath: f.previous_filename,
linesAdded: f.additions,
linesRemoved: f.deletions,
patch: f.patch
? toUnifiedDiffPatch(
f.patch,
f.filename,
f.previous_filename,
status,
)
: undefined,
};
});
} catch (error) {
log.warn("Failed to fetch branch changed files", {
repo,
branch,
error,
});
throw error;
}
});
}

public async generateCommitMessage(
Expand Down
2 changes: 1 addition & 1 deletion apps/code/src/main/services/handoff/handoff-saga.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ function createDeps(overrides: Partial<HandoffSagaDeps> = {}): HandoffSagaDeps {
getTaskRun: vi.fn().mockResolvedValue({
log_url: "https://logs.example.com/run-1.ndjson",
}),
updateTaskRun: vi.fn().mockResolvedValue({}),
}),
applyTreeSnapshot: vi.fn().mockResolvedValue(undefined),
applyGitCheckpoint: vi.fn().mockResolvedValue(undefined),
Expand All @@ -97,7 +98,6 @@ function createResumeState(
conversation: [],
latestSnapshot: null,
latestGitCheckpoint: null,
snapshotApplied: false,
interrupted: false,
logEntryCount: 0,
...overrides,
Expand Down
28 changes: 10 additions & 18 deletions apps/code/src/main/services/handoff/handoff-saga.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,18 @@ import {
} from "@posthog/agent/resume";
import type * as AgentTypes from "@posthog/agent/types";
import { Saga, type SagaLogger } from "@posthog/shared";
import type { WorkspaceMode } from "../../db/repositories/workspace-repository";
import type { SessionResponse } from "../agent/schemas";
import type { HandoffStep } from "./schemas";

export interface HandoffSagaInput {
taskId: string;
runId: string;
repoPath: string;
apiHost: string;
teamId: number;
sessionId?: string;
adapter?: "claude" | "codex";
localGitState?: AgentTypes.HandoffLocalGitState;
}
import type { HandoffBaseDeps, HandoffExecuteInput } from "./schemas";

export type HandoffSagaInput = HandoffExecuteInput;

export interface HandoffSagaOutput {
sessionId: string;
snapshotApplied: boolean;
conversationTurns: number;
}

export interface HandoffSagaDeps {
createApiClient(apiHost: string, teamId: number): PostHogAPIClient;
export interface HandoffSagaDeps extends HandoffBaseDeps {
applyTreeSnapshot(
snapshot: AgentTypes.TreeSnapshotEvent,
repoPath: string,
Expand All @@ -44,7 +33,6 @@ export interface HandoffSagaDeps {
apiClient: PostHogAPIClient,
localGitState?: AgentTypes.HandoffLocalGitState,
): Promise<void>;
updateWorkspaceMode(taskId: string, mode: WorkspaceMode): void;
reconnectSession(params: {
taskId: string;
taskRunId: string;
Expand All @@ -63,9 +51,7 @@ export interface HandoffSagaDeps {
localGitState?: AgentTypes.HandoffLocalGitState,
): Promise<void>;
seedLocalLogs(runId: string, logUrl: string): Promise<void>;
killSession(taskRunId: string): Promise<void>;
setPendingContext(taskRunId: string, context: string): void;
onProgress(step: HandoffStep, message: string): void;
}

export class HandoffSaga extends Saga<HandoffSagaInput, HandoffSagaOutput> {
Expand Down Expand Up @@ -97,6 +83,12 @@ export class HandoffSaga extends Saga<HandoffSagaInput, HandoffSagaOutput> {

const apiClient = this.deps.createApiClient(apiHost, teamId);

await this.readOnlyStep("update_run_environment", async () => {
await apiClient.updateTaskRun(taskId, runId, {
environment: "local",
});
});

const { resumeState, cloudLogUrl } = await this.readOnlyStep(
"fetch_and_rebuild",
async () => {
Expand Down
112 changes: 112 additions & 0 deletions apps/code/src/main/services/handoff/handoff-to-cloud-saga.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import type * as AgentTypes from "@posthog/agent/types";
import { Saga, type SagaLogger } from "@posthog/shared";
import type { HandoffBaseDeps, HandoffToCloudExecuteInput } from "./schemas";

export type HandoffToCloudSagaInput = HandoffToCloudExecuteInput;

export interface HandoffToCloudSagaOutput {
checkpointCaptured: boolean;
snapshotCaptured: boolean;
flushedLogEntryCount: number;
}

export interface HandoffToCloudSagaDeps extends HandoffBaseDeps {
captureGitCheckpoint(
localGitState?: AgentTypes.HandoffLocalGitState,
): Promise<AgentTypes.GitCheckpointEvent | null>;
captureTreeSnapshot(): Promise<AgentTypes.TreeSnapshotEvent | null>;
persistCheckpointToLog(
checkpoint: AgentTypes.GitCheckpointEvent,
): Promise<void>;
persistSnapshotToLog(snapshot: AgentTypes.TreeSnapshotEvent): Promise<void>;
flushLocalLogs(): Promise<number>;
resumeRunInCloud(): Promise<void>;
}

export class HandoffToCloudSaga extends Saga<
HandoffToCloudSagaInput,
HandoffToCloudSagaOutput
> {
readonly sagaName = "HandoffToCloudSaga";
private deps: HandoffToCloudSagaDeps;

constructor(deps: HandoffToCloudSagaDeps, logger?: SagaLogger) {
super(logger);
this.deps = deps;
}

protected async execute(
input: HandoffToCloudSagaInput,
): Promise<HandoffToCloudSagaOutput> {
const { taskId, runId } = input;

let checkpointCaptured = false;
let snapshotCaptured = false;

this.deps.onProgress(
"capturing_checkpoint",
"Capturing local git state...",
);

const checkpoint = await this.readOnlyStep("capture_git_checkpoint", () =>
this.deps.captureGitCheckpoint(input.localGitState),
);

let persistedNotificationCount = 0;

if (checkpoint) {
await this.readOnlyStep("persist_checkpoint_to_log", () =>
this.deps.persistCheckpointToLog(checkpoint),
);
checkpointCaptured = true;
persistedNotificationCount++;
}

this.deps.onProgress("capturing_snapshot", "Capturing local file state...");

const snapshot = await this.readOnlyStep("capture_tree_snapshot", () =>
this.deps.captureTreeSnapshot(),
);

if (snapshot) {
await this.readOnlyStep("persist_snapshot_to_log", () =>
this.deps.persistSnapshotToLog(snapshot),
);
snapshotCaptured = true;
persistedNotificationCount++;
}

const localLogLineCount = await this.readOnlyStep("flush_local_logs", () =>
this.deps.flushLocalLogs(),
);
const flushedLogEntryCount = localLogLineCount + persistedNotificationCount;

this.deps.onProgress("starting_cloud_run", "Starting cloud sandbox...");

await this.step({
name: "start_cloud_run",
execute: () => this.deps.resumeRunInCloud(),
rollback: async () => {},
});

this.deps.onProgress("stopping_agent", "Stopping local agent...");

await this.readOnlyStep("stop_local_agent", () =>
this.deps.killSession(runId),
);

await this.step({
name: "update_workspace",
execute: async () => {
this.deps.updateWorkspaceMode(taskId, "cloud");
},
rollback: async () => {
this.deps.updateWorkspaceMode(taskId, "local");
},
});

this.deps.onProgress("complete", "Handoff to cloud complete");

return { checkpointCaptured, snapshotCaptured, flushedLogEntryCount };
}
}
Loading
Loading