Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- All of `bun fmt`, `bun lint`, and `bun typecheck` must pass before considering tasks completed.
- NEVER run `bun test`. Always use `bun run test` (runs Vitest).
- For any change that must be visible on the served app endpoint, rebuild and restart the served app before considering the task complete. A browser hard refresh is not sufficient for the Tailscale-served production bundle.

## Project Snapshot

Expand Down
6 changes: 6 additions & 0 deletions apps/server/src/codexAppServerManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@ describe("isRecoverableThreadResumeError", () => {
),
).toBe(false);
});

it("treats thread-resume timeouts as recoverable", () => {
expect(isRecoverableThreadResumeError(new Error("Timed out waiting for thread/resume."))).toBe(
true,
);
});
});

describe("readCodexAccountSnapshot", () => {
Expand Down
18 changes: 14 additions & 4 deletions apps/server/src/codexAppServerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ const RECOVERABLE_THREAD_RESUME_ERROR_SNIPPETS = [
"no such thread",
"unknown thread",
"does not exist",
"timed out waiting for thread/resume",
];
const CODEX_DEFAULT_MODEL = "gpt-5.3-codex";
const CODEX_SPARK_MODEL = "gpt-5.3-codex-spark";
Expand Down Expand Up @@ -394,9 +395,9 @@ export function resolveCodexModelForAccount(
}

/**
* On Windows with `shell: true`, `child.kill()` only terminates the `cmd.exe`
* wrapper, leaving the actual command running. Use `taskkill /T` to kill the
* entire process tree instead.
* Codex can spawn nested helper processes. On Unix we run each session in its
* own process group so teardown can kill the entire subtree instead of leaking
* orphaned `@github/copilot` workers after restarts and timeouts.
*/
function killChildTree(child: ChildProcessWithoutNullStreams): void {
if (process.platform === "win32" && child.pid !== undefined) {
Expand All @@ -407,7 +408,15 @@ function killChildTree(child: ChildProcessWithoutNullStreams): void {
// fallback to direct kill
}
}
child.kill();
if (child.pid !== undefined) {
try {
process.kill(-child.pid, "SIGKILL");
return;
} catch {
// fallback to direct kill
}
}
child.kill("SIGKILL");
}

export function normalizeCodexModelSlug(
Expand Down Expand Up @@ -599,6 +608,7 @@ export class CodexAppServerManager extends EventEmitter<CodexAppServerManagerEve
cwd: resolvedCwd,
env: sessionEnv,
stdio: ["pipe", "pipe", "pipe"],
detached: process.platform !== "win32",
shell: process.platform === "win32",
});
const output = readline.createInterface({ input: child.stdout });
Expand Down
119 changes: 119 additions & 0 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1927,6 +1927,125 @@ it.effect("restores pending turn-start metadata across projection pipeline resta
),
);

it.effect("clears pending turn-start rows when provider turn start fails", () =>
Effect.gen(function* () {
const eventStore = yield* OrchestrationEventStore;
const projectionPipeline = yield* OrchestrationProjectionPipeline;
const sql = yield* SqlClient.SqlClient;

const threadId = ThreadId.makeUnsafe("thread-start-failure");
const messageId = MessageId.makeUnsafe("message-start-failure");
const now = "2026-02-26T15:00:00.000Z";

const appendAndProject = (event: Parameters<typeof eventStore.append>[0]) =>
eventStore
.append(event)
.pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent)));

yield* appendAndProject({
type: "project.created",
eventId: EventId.makeUnsafe("evt-start-failure-project"),
aggregateKind: "project",
aggregateId: ProjectId.makeUnsafe("project-start-failure"),
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-start-failure-project"),
causationEventId: null,
correlationId: CorrelationId.makeUnsafe("cmd-start-failure-project"),
metadata: {},
payload: {
projectId: ProjectId.makeUnsafe("project-start-failure"),
title: "Project Start Failure",
workspaceRoot: "/tmp/project-start-failure",
defaultModel: null,
scripts: [],
createdAt: now,
updatedAt: now,
},
});

yield* appendAndProject({
type: "thread.created",
eventId: EventId.makeUnsafe("evt-start-failure-thread"),
aggregateKind: "thread",
aggregateId: threadId,
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-start-failure-thread"),
causationEventId: null,
correlationId: CorrelationId.makeUnsafe("cmd-start-failure-thread"),
metadata: {},
payload: {
threadId,
projectId: ProjectId.makeUnsafe("project-start-failure"),
title: "Thread Start Failure",
model: "gpt-5-codex",
runtimeMode: "full-access",
branch: null,
worktreePath: null,
createdAt: now,
updatedAt: now,
},
});

yield* appendAndProject({
type: "thread.turn-start-requested",
eventId: EventId.makeUnsafe("evt-start-failure-requested"),
aggregateKind: "thread",
aggregateId: threadId,
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-start-failure-requested"),
causationEventId: null,
correlationId: CorrelationId.makeUnsafe("cmd-start-failure-requested"),
metadata: {},
payload: {
threadId,
messageId,
runtimeMode: "full-access",
createdAt: now,
},
});

yield* appendAndProject({
type: "thread.activity-appended",
eventId: EventId.makeUnsafe("evt-start-failure-activity"),
aggregateKind: "thread",
aggregateId: threadId,
occurredAt: now,
commandId: CommandId.makeUnsafe("cmd-start-failure-activity"),
causationEventId: null,
correlationId: CorrelationId.makeUnsafe("cmd-start-failure-activity"),
metadata: {},
payload: {
threadId,
activity: {
id: EventId.makeUnsafe("activity-start-failure"),
tone: "error",
kind: "provider.turn.start.failed",
summary: "Provider turn start failed",
payload: {
detail: "Timed out waiting for thread/start.",
},
turnId: null,
createdAt: now,
},
},
});

const pendingRows = yield* sql<{ readonly threadId: string }>`
SELECT thread_id AS "threadId"
FROM projection_turns
WHERE thread_id = ${threadId}
AND turn_id IS NULL
AND state = 'pending'
`;

assert.deepEqual(pendingRows, []);
}).pipe(
Effect.provide(
makeProjectionPipelinePrefixedTestLayer("t3-projection-pipeline-start-failure-"),
),
),
);

const engineLayer = it.layer(
OrchestrationEngineLive.pipe(
Layer.provide(OrchestrationProjectionPipelineLive),
Expand Down
5 changes: 5 additions & 0 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,11 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
: {}),
createdAt: event.payload.activity.createdAt,
});
if (event.payload.activity.kind === "provider.turn.start.failed") {
yield* projectionTurnRepository.deletePendingTurnStartByThreadId({
threadId: event.payload.threadId,
});
}
return;

case "thread.reverted": {
Expand Down
125 changes: 125 additions & 0 deletions apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,84 @@ describe("ProviderCommandReactor", () => {
expect(thread?.session?.runtimeMode).toBe("full-access");
});

it("stops and clears a session when turn start fails while the thread is stuck starting", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.makeUnsafe("cmd-turn-start-bind-session"),
threadId: ThreadId.makeUnsafe("thread-1"),
message: {
messageId: asMessageId("user-message-bind-session"),
role: "user",
text: "first",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);

await waitFor(() => harness.startSession.mock.calls.length === 1);

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.session.set",
commandId: CommandId.makeUnsafe("cmd-session-set-starting-after-bind"),
threadId: ThreadId.makeUnsafe("thread-1"),
session: {
threadId: ThreadId.makeUnsafe("thread-1"),
status: "starting",
providerName: "codex",
runtimeMode: "approval-required",
activeTurnId: null,
lastError: null,
updatedAt: now,
},
createdAt: now,
}),
);

harness.startSession.mockImplementationOnce(
(_: unknown, __: unknown) => Effect.fail(new Error("simulated start failure")) as never,
);

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.makeUnsafe("cmd-turn-start-fail-while-starting"),
threadId: ThreadId.makeUnsafe("thread-1"),
message: {
messageId: asMessageId("user-message-fail-while-starting"),
role: "user",
text: "second",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);

await waitFor(async () => {
const readModel = await Effect.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find(
(entry) => entry.id === ThreadId.makeUnsafe("thread-1"),
);
return thread?.session?.status === "stopped";
});

expect(harness.stopSession.mock.calls.length).toBe(1);
const readModel = await Effect.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.makeUnsafe("thread-1"));
expect(thread?.session?.status).toBe("stopped");
expect(thread?.session?.activeTurnId).toBeNull();
expect(thread?.session?.lastError).toContain("simulated start failure");
});

it("reacts to thread.turn.interrupt-requested by calling provider interrupt", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
Expand Down Expand Up @@ -1147,6 +1225,53 @@ describe("ProviderCommandReactor", () => {
});
});

it("clears stale active turns when interrupt is requested without a live provider turn", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.session.set",
commandId: CommandId.makeUnsafe("cmd-session-set-stale-interrupt"),
threadId: ThreadId.makeUnsafe("thread-1"),
session: {
threadId: ThreadId.makeUnsafe("thread-1"),
status: "running",
providerName: "codex",
runtimeMode: "approval-required",
activeTurnId: asTurnId("turn-stale"),
lastError: null,
updatedAt: now,
},
createdAt: now,
}),
);

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.interrupt",
commandId: CommandId.makeUnsafe("cmd-turn-interrupt-stale"),
threadId: ThreadId.makeUnsafe("thread-1"),
turnId: asTurnId("turn-stale"),
createdAt: now,
}),
);

await waitFor(async () => {
const readModel = await Effect.runPromise(harness.engine.getReadModel());
return (
readModel.threads.find((entry) => entry.id === ThreadId.makeUnsafe("thread-1"))?.session
?.activeTurnId === null
);
});

expect(harness.interruptTurn.mock.calls.length).toBe(0);
const readModel = await Effect.runPromise(harness.engine.getReadModel());
const thread = readModel.threads.find((entry) => entry.id === ThreadId.makeUnsafe("thread-1"));
expect(thread?.session?.status).toBe("ready");
expect(thread?.session?.activeTurnId).toBeNull();
});

it("reacts to thread.approval.respond by forwarding provider approval response", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
Expand Down
Loading