diff --git a/plugins/codex/scripts/app-server-broker.mjs b/plugins/codex/scripts/app-server-broker.mjs index 1954274..4db3103 100644 --- a/plugins/codex/scripts/app-server-broker.mjs +++ b/plugins/codex/scripts/app-server-broker.mjs @@ -70,6 +70,7 @@ async function main() { let activeStreamSocket = null; let activeStreamThreadIds = null; const sockets = new Set(); + let shuttingDown = false; function clearSocketOwnership(socket) { if (activeRequestSocket === socket) { @@ -100,7 +101,25 @@ async function main() { } async function shutdown(server) { + if (shuttingDown) { + return; + } + shuttingDown = true; + // Notify any client whose request is still in flight that the broker is + // going down. Without this they would hang on a half-open socket waiting + // for a response that will never arrive. + const fanoutMessage = { + method: "notifications/broker/shuttingDown", + params: { + reason: appClient.exitError ? String(appClient.exitError.message ?? appClient.exitError) : "broker shutdown" + } + }; for (const socket of sockets) { + try { + send(socket, fanoutMessage); + } catch { + // Best-effort fanout. Continue tearing down. + } socket.end(); } await appClient.close().catch(() => {}); @@ -115,6 +134,21 @@ async function main() { appClient.setNotificationHandler(routeNotification); + // If the underlying codex CLI app-server dies (crashes, OOM, exits), tear + // down the broker rather than zombify with a dead client. The next companion + // process will detect a dead endpoint via ensureBrokerSession and respawn. + // See: openai/codex-plugin-cc#183. + appClient.exitPromise.then(() => { + if (shuttingDown) { + return; + } + const detail = appClient.exitError instanceof Error ? appClient.exitError.message : String(appClient.exitError ?? "unknown"); + process.stderr.write(`[broker] underlying codex app-server exited: ${detail}\n`); + shutdown(server) + .catch(() => {}) + .finally(() => process.exit(1)); + }); + const server = net.createServer((socket) => { sockets.add(socket); socket.setEncoding("utf8"); diff --git a/plugins/codex/scripts/lib/codex.mjs b/plugins/codex/scripts/lib/codex.mjs index f2fe88b..1ce93ad 100644 --- a/plugins/codex/scripts/lib/codex.mjs +++ b/plugins/codex/scripts/lib/codex.mjs @@ -597,7 +597,37 @@ async function captureTurn(client, threadId, startRequest, options = {}) { completeTurn(state, response.turn); } - return await state.completion; + // Race the turn completion against the client's exit promise so that if + // the underlying codex app-server (or broker socket) dies mid-turn, we + // reject instead of hanging forever waiting for a terminal event that + // will never arrive. See: openai/codex-plugin-cc#183. + const exitWatch = client.exitPromise.then(() => { + if (state.completed) { + return; + } + const reason = + client.exitError instanceof Error + ? client.exitError + : new Error( + client.exitError + ? String(client.exitError) + : "codex app-server connection closed before the turn completed." + ); + emitProgress( + state.onProgress, + `Codex app-server disconnected mid-turn: ${reason.message}`, + "failed" + ); + state.rejectCompletion(reason); + }); + + try { + return await state.completion; + } finally { + // Detach the exit watcher so its rejection (if any) does not surface + // as an unhandled rejection after we have already settled the turn. + exitWatch.catch(() => {}); + } } finally { clearCompletionTimer(state); client.setNotificationHandler(previousHandler ?? null); diff --git a/plugins/codex/scripts/lib/job-control.mjs b/plugins/codex/scripts/lib/job-control.mjs index 284c660..41bb821 100644 --- a/plugins/codex/scripts/lib/job-control.mjs +++ b/plugins/codex/scripts/lib/job-control.mjs @@ -1,13 +1,109 @@ import fs from "node:fs"; import { getSessionRuntimeStatus } from "./codex.mjs"; -import { getConfig, listJobs, readJobFile, resolveJobFile } from "./state.mjs"; +import { getConfig, listJobs, readJobFile, resolveJobFile, upsertJob, writeJobFile } from "./state.mjs"; +import { isProcessAlive } from "./process.mjs"; import { SESSION_ID_ENV } from "./tracked-jobs.mjs"; import { resolveWorkspaceRoot } from "./workspace.mjs"; export const DEFAULT_MAX_STATUS_JOBS = 8; export const DEFAULT_MAX_PROGRESS_LINES = 4; +function isActiveJobStatus(status) { + return status === "queued" || status === "running"; +} + +/** + * Marks a job as failed when its tracked process has died unexpectedly. + * Re-reads the latest persisted state from disk before writing to guard + * against races where the job completes legitimately at the same moment. + * + * @param {string} workspaceRoot + * @param {string} jobId + * @param {number} pid - The PID we observed as dead + * @returns {boolean} true if the job was reconciled, false if skipped + */ +export function markDeadPidJobFailed(workspaceRoot, jobId, pid) { + const jobFile = resolveJobFile(workspaceRoot, jobId); + + let latestJob; + try { + latestJob = readJobFile(jobFile); + } catch { + return false; + } + + // Only overwrite active states; never downgrade completed/failed/cancelled. + if (!isActiveJobStatus(latestJob.status)) { + return false; + } + + // Only overwrite if the PID still matches what we observed as dead. This + // guards against a job that legitimately restarted with a new PID between + // our liveness probe and the write. + if (latestJob.pid !== pid) { + return false; + } + + const completedAt = new Date().toISOString(); + const errorMessage = `Tracked process PID ${pid} exited unexpectedly without writing a terminal status.`; + + const failedPatch = { + status: "failed", + phase: "failed", + pid: null, + errorMessage, + completedAt + }; + + writeJobFile(workspaceRoot, jobId, { + ...latestJob, + ...failedPatch + }); + + upsertJob(workspaceRoot, { + id: jobId, + ...failedPatch + }); + + return true; +} + +/** + * If a job is still marked active but its tracked PID is dead, reconcile it + * to failed and return the updated record. Otherwise return the original. + * + * Called from every status read path so a single status query is enough to + * surface dead workers - no need to wait for a polling watcher. + * + * @param {string} workspaceRoot + * @param {object} job + * @returns {object} + */ +function reconcileIfDead(workspaceRoot, job) { + if (!job || !isActiveJobStatus(job.status)) { + return job; + } + const pid = Number.isFinite(job.pid) ? job.pid : null; + if (pid === null) { + return job; + } + if (isProcessAlive(pid)) { + return job; + } + + try { + markDeadPidJobFailed(workspaceRoot, job.id, pid); + } catch { + // Never let reconciliation errors crash a status read. + return job; + } + + // Re-read so the caller sees the reconciled fields. + const refreshed = readStoredJob(workspaceRoot, job.id); + return refreshed ? { ...job, ...refreshed } : job; +} + export function sortJobsNewestFirst(jobs) { return [...jobs].sort((left, right) => String(right.updatedAt ?? "").localeCompare(String(left.updatedAt ?? ""))); } @@ -213,19 +309,22 @@ function matchJobReference(jobs, reference, predicate = () => true) { export function buildStatusSnapshot(cwd, options = {}) { const workspaceRoot = resolveWorkspaceRoot(cwd); const config = getConfig(workspaceRoot); - const jobs = sortJobsNewestFirst(filterJobsForCurrentSession(listJobs(workspaceRoot), options)); + const rawJobs = sortJobsNewestFirst(filterJobsForCurrentSession(listJobs(workspaceRoot), options)); + // Reconcile any active jobs whose tracked PID is dead before partitioning, + // so a single status read surfaces stuck workers immediately. + const jobs = rawJobs.map((job) => reconcileIfDead(workspaceRoot, job)); const maxJobs = options.maxJobs ?? DEFAULT_MAX_STATUS_JOBS; const maxProgressLines = options.maxProgressLines ?? DEFAULT_MAX_PROGRESS_LINES; const running = jobs - .filter((job) => job.status === "queued" || job.status === "running") + .filter((job) => isActiveJobStatus(job.status)) .map((job) => enrichJob(job, { maxProgressLines })); - const latestFinishedRaw = jobs.find((job) => job.status !== "queued" && job.status !== "running") ?? null; + const latestFinishedRaw = jobs.find((job) => !isActiveJobStatus(job.status)) ?? null; const latestFinished = latestFinishedRaw ? enrichJob(latestFinishedRaw, { maxProgressLines }) : null; const recent = (options.all ? jobs : jobs.slice(0, maxJobs)) - .filter((job) => job.status !== "queued" && job.status !== "running" && job.id !== latestFinished?.id) + .filter((job) => !isActiveJobStatus(job.status) && job.id !== latestFinished?.id) .map((job) => enrichJob(job, { maxProgressLines })); return { @@ -247,9 +346,11 @@ export function buildSingleJobSnapshot(cwd, reference, options = {}) { throw new Error(`No job found for "${reference}". Run /codex:status to inspect known jobs.`); } + const reconciled = reconcileIfDead(workspaceRoot, selected); + return { workspaceRoot, - job: enrichJob(selected, { maxProgressLines: options.maxProgressLines }) + job: enrichJob(reconciled, { maxProgressLines: options.maxProgressLines }) }; } diff --git a/plugins/codex/scripts/lib/process.mjs b/plugins/codex/scripts/lib/process.mjs index af28d1c..6b07365 100644 --- a/plugins/codex/scripts/lib/process.mjs +++ b/plugins/codex/scripts/lib/process.mjs @@ -54,6 +54,26 @@ function looksLikeMissingProcessMessage(text) { return /not found|no running instance|cannot find|does not exist|no such process/i.test(text); } +/** + * Checks whether a process with the given PID is still running. + * Uses signal 0 which does not affect the process - it only checks existence. + * + * @param {number | null | undefined} pid + * @returns {boolean} + */ +export function isProcessAlive(pid) { + if (pid == null || !Number.isFinite(pid) || pid <= 0) { + return false; + } + try { + process.kill(pid, 0); + return true; + } catch (error) { + // ESRCH = no such process (dead). EPERM = exists but no signal permission. + return error?.code === "EPERM"; + } +} + export function terminateProcessTree(pid, options = {}) { if (!Number.isFinite(pid)) { return { attempted: false, delivered: false, method: null }; diff --git a/plugins/codex/scripts/lib/tracked-jobs.mjs b/plugins/codex/scripts/lib/tracked-jobs.mjs index 9028690..77c28c1 100644 --- a/plugins/codex/scripts/lib/tracked-jobs.mjs +++ b/plugins/codex/scripts/lib/tracked-jobs.mjs @@ -5,6 +5,22 @@ import { readJobFile, resolveJobFile, resolveJobLogFile, upsertJob, writeJobFile export const SESSION_ID_ENV = "CODEX_COMPANION_SESSION_ID"; +// Hard ceiling for any single tracked job. Default 30 minutes is generous +// enough for long Codex turns but bounded so a hung captureTurn cannot keep +// the companion process alive forever. Override via CODEX_COMPANION_JOB_TIMEOUT_MS. +const DEFAULT_JOB_TIMEOUT_MS = 30 * 60 * 1000; + +function resolveJobTimeoutMs(options = {}) { + if (Number.isFinite(options.timeoutMs) && options.timeoutMs > 0) { + return options.timeoutMs; + } + const fromEnv = Number(process.env.CODEX_COMPANION_JOB_TIMEOUT_MS); + if (Number.isFinite(fromEnv) && fromEnv > 0) { + return fromEnv; + } + return DEFAULT_JOB_TIMEOUT_MS; +} + export function nowIso() { return new Date().toISOString(); } @@ -151,8 +167,30 @@ export async function runTrackedJob(job, runner, options = {}) { writeJobFile(job.workspaceRoot, job.id, runningRecord); upsertJob(job.workspaceRoot, runningRecord); + // Race the runner against a hard timeout. If captureTurn or any other + // long-poll inside the runner hangs (e.g. underlying app-server died and + // never produced a terminal event), this guarantees the companion exits + // and the job transitions to a terminal status. See: openai/codex-plugin-cc#183. + const timeoutMs = resolveJobTimeoutMs(options); + let timeoutHandle = null; + const timeoutPromise = new Promise((_resolve, reject) => { + timeoutHandle = setTimeout(() => { + reject( + new Error( + `Tracked job ${job.id} exceeded the ${Math.round(timeoutMs / 1000)}s hard timeout. ` + + "The runner did not produce a terminal status. Set CODEX_COMPANION_JOB_TIMEOUT_MS to adjust." + ) + ); + }, timeoutMs); + timeoutHandle.unref?.(); + }); + try { - const execution = await runner(); + const execution = await Promise.race([runner(), timeoutPromise]); + if (timeoutHandle) { + clearTimeout(timeoutHandle); + timeoutHandle = null; + } const completionStatus = execution.exitStatus === 0 ? "completed" : "failed"; const completedAt = nowIso(); writeJobFile(job.workspaceRoot, job.id, { @@ -179,6 +217,10 @@ export async function runTrackedJob(job, runner, options = {}) { appendLogBlock(options.logFile ?? job.logFile ?? null, "Final output", execution.rendered); return execution; } catch (error) { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + timeoutHandle = null; + } const errorMessage = error instanceof Error ? error.message : String(error); const existing = readStoredJobOrNull(job.workspaceRoot, job.id) ?? runningRecord; const completedAt = nowIso(); diff --git a/tests/dead-pid-reconcile.test.mjs b/tests/dead-pid-reconcile.test.mjs new file mode 100644 index 0000000..7930a35 --- /dev/null +++ b/tests/dead-pid-reconcile.test.mjs @@ -0,0 +1,133 @@ +import test from "node:test"; +import assert from "node:assert/strict"; + +import { makeTempDir } from "./helpers.mjs"; +import { + buildSingleJobSnapshot, + buildStatusSnapshot, + markDeadPidJobFailed +} from "../plugins/codex/scripts/lib/job-control.mjs"; +import { ensureStateDir, upsertJob, writeJobFile } from "../plugins/codex/scripts/lib/state.mjs"; + +// Pick a PID that is virtually guaranteed to be dead. PID 999999 is well above +// the default macOS/Linux pid_max for short-lived workloads. +const DEAD_PID = 999_999; + +// Stamp test jobs with the inherited session id (if any) so they survive +// filterJobsForCurrentSession when running under Claude Code's harness. +const TEST_SESSION_ID = process.env.CODEX_COMPANION_SESSION_ID ?? null; + +function seedRunningJobWithDeadPid(workspace, jobId, pid = DEAD_PID) { + ensureStateDir(workspace); + const record = { + id: jobId, + kind: "task", + kindLabel: "rescue", + title: "Codex Task", + workspaceRoot: workspace, + jobClass: "task", + summary: "test job", + write: false, + createdAt: new Date(Date.now() - 60_000).toISOString(), + startedAt: new Date(Date.now() - 60_000).toISOString(), + updatedAt: new Date(Date.now() - 60_000).toISOString(), + status: "running", + phase: "running", + pid, + logFile: null, + ...(TEST_SESSION_ID ? { sessionId: TEST_SESSION_ID } : {}) + }; + writeJobFile(workspace, jobId, record); + upsertJob(workspace, record); +} + +test("markDeadPidJobFailed transitions a running job to failed", () => { + const workspace = makeTempDir(); + seedRunningJobWithDeadPid(workspace, "task-deadpid-1"); + + const reconciled = markDeadPidJobFailed(workspace, "task-deadpid-1", DEAD_PID); + assert.equal(reconciled, true); + + const snapshot = buildSingleJobSnapshot(workspace, "task-deadpid-1"); + assert.equal(snapshot.job.status, "failed"); + assert.equal(snapshot.job.phase, "failed"); + assert.equal(snapshot.job.pid, null); + assert.match(snapshot.job.errorMessage ?? "", /exited unexpectedly/); +}); + +test("markDeadPidJobFailed is a no-op when the job already finished", () => { + const workspace = makeTempDir(); + ensureStateDir(workspace); + const finishedRecord = { + id: "task-finished", + kind: "task", + kindLabel: "rescue", + title: "Codex Task", + workspaceRoot: workspace, + jobClass: "task", + summary: "test job", + createdAt: new Date().toISOString(), + startedAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + completedAt: new Date().toISOString(), + status: "completed", + phase: "done", + pid: null, + ...(TEST_SESSION_ID ? { sessionId: TEST_SESSION_ID } : {}) + }; + writeJobFile(workspace, "task-finished", finishedRecord); + upsertJob(workspace, finishedRecord); + + const reconciled = markDeadPidJobFailed(workspace, "task-finished", DEAD_PID); + assert.equal(reconciled, false); + + const snapshot = buildSingleJobSnapshot(workspace, "task-finished"); + assert.equal(snapshot.job.status, "completed"); +}); + +test("markDeadPidJobFailed refuses to overwrite a job whose PID has rotated", () => { + const workspace = makeTempDir(); + // Use the current process PID so the job's stored PID is alive and the + // snapshot reconciler does not interfere with what we are testing here. + seedRunningJobWithDeadPid(workspace, "task-rotated", process.pid); + + // Caller saw 999999 as dead, but the job is now tracking a different PID. + const reconciled = markDeadPidJobFailed(workspace, "task-rotated", DEAD_PID); + assert.equal(reconciled, false); + + const snapshot = buildSingleJobSnapshot(workspace, "task-rotated"); + assert.equal(snapshot.job.status, "running"); +}); + +test("buildSingleJobSnapshot reconciles a running job with a dead PID without --wait", () => { + const workspace = makeTempDir(); + seedRunningJobWithDeadPid(workspace, "task-snapshot-dead"); + + const snapshot = buildSingleJobSnapshot(workspace, "task-snapshot-dead"); + assert.equal(snapshot.job.status, "failed"); + assert.equal(snapshot.job.phase, "failed"); + assert.match(snapshot.job.errorMessage ?? "", /exited unexpectedly/); +}); + +test("buildStatusSnapshot moves dead-pid jobs out of the running list", () => { + const workspace = makeTempDir(); + seedRunningJobWithDeadPid(workspace, "task-status-dead"); + + // Pass an env without a session id so filterJobsForCurrentSession does not + // exclude our seeded jobs (the test runner inherits CODEX_COMPANION_SESSION_ID + // from Claude Code). + const snapshot = buildStatusSnapshot(workspace, { env: {} }); + assert.equal(snapshot.running.length, 0); + assert.equal(snapshot.latestFinished?.id, "task-status-dead"); + assert.equal(snapshot.latestFinished?.status, "failed"); +}); + +test("buildStatusSnapshot leaves a running job alone when its PID is alive", () => { + const workspace = makeTempDir(); + seedRunningJobWithDeadPid(workspace, "task-status-alive", process.pid); + + const snapshot = buildStatusSnapshot(workspace, { env: {} }); + assert.equal(snapshot.running.length, 1); + assert.equal(snapshot.running[0].id, "task-status-alive"); + assert.equal(snapshot.running[0].status, "running"); +}); diff --git a/tests/process.test.mjs b/tests/process.test.mjs index 80e0715..4348822 100644 --- a/tests/process.test.mjs +++ b/tests/process.test.mjs @@ -1,7 +1,30 @@ import test from "node:test"; import assert from "node:assert/strict"; +import { spawn } from "node:child_process"; -import { terminateProcessTree } from "../plugins/codex/scripts/lib/process.mjs"; +import { isProcessAlive, terminateProcessTree } from "../plugins/codex/scripts/lib/process.mjs"; + +test("isProcessAlive returns false for null and invalid pids", () => { + assert.equal(isProcessAlive(null), false); + assert.equal(isProcessAlive(undefined), false); + assert.equal(isProcessAlive(0), false); + assert.equal(isProcessAlive(-5), false); + assert.equal(isProcessAlive(Number.NaN), false); + assert.equal(isProcessAlive("123"), false); +}); + +test("isProcessAlive returns true for the current process", () => { + assert.equal(isProcessAlive(process.pid), true); +}); + +test("isProcessAlive returns false after a child has exited", async () => { + const child = spawn(process.execPath, ["-e", "process.exit(0)"], { stdio: "ignore" }); + const pid = child.pid; + await new Promise((resolve) => child.on("exit", resolve)); + // Give the kernel a moment to fully reap the entry on Linux/macOS. + await new Promise((resolve) => setTimeout(resolve, 50)); + assert.equal(isProcessAlive(pid), false); +}); test("terminateProcessTree uses taskkill on Windows", () => { let captured = null; diff --git a/tests/tracked-jobs-timeout.test.mjs b/tests/tracked-jobs-timeout.test.mjs new file mode 100644 index 0000000..f1a691e --- /dev/null +++ b/tests/tracked-jobs-timeout.test.mjs @@ -0,0 +1,88 @@ +import test from "node:test"; +import assert from "node:assert/strict"; + +import { makeTempDir } from "./helpers.mjs"; +import { runTrackedJob } from "../plugins/codex/scripts/lib/tracked-jobs.mjs"; +import { listJobs } from "../plugins/codex/scripts/lib/state.mjs"; + +test("runTrackedJob enforces a hard timeout when the runner hangs", async () => { + const workspace = makeTempDir(); + const job = { + id: "task-hang-1", + kind: "task", + kindLabel: "rescue", + title: "Hung Task", + workspaceRoot: workspace, + jobClass: "task", + summary: "test job", + write: false, + createdAt: new Date().toISOString() + }; + + // Hold the event loop open with a ref'd timer for the duration of this test. + // The production timeout uses unref so it does not keep the process alive + // when paired with a real long-running runner; in the test the hanging + // runner has nothing keeping the loop alive on its own. + const keepAlive = setInterval(() => {}, 1_000); + + let resolveRunner; + const hangingRunner = () => + new Promise((resolve) => { + resolveRunner = resolve; + }); + + try { + await assert.rejects( + () => runTrackedJob(job, hangingRunner, { timeoutMs: 50 }), + /exceeded the .+ hard timeout/ + ); + + const jobs = listJobs(workspace); + const stored = jobs.find((entry) => entry.id === "task-hang-1"); + assert.ok(stored, "expected the hung job to be persisted"); + assert.equal(stored.status, "failed"); + assert.equal(stored.phase, "failed"); + assert.equal(stored.pid, null); + assert.match(stored.errorMessage ?? "", /hard timeout/); + } finally { + // Drain the leaked runner promise and free the event loop. + resolveRunner?.({ exitStatus: 1, payload: null, rendered: "", summary: "drained" }); + clearInterval(keepAlive); + } +}); + +test("runTrackedJob still records a normal completion when the runner settles in time", async () => { + const workspace = makeTempDir(); + const job = { + id: "task-ok-1", + kind: "task", + kindLabel: "rescue", + title: "Quick Task", + workspaceRoot: workspace, + jobClass: "task", + summary: "test job", + write: false, + createdAt: new Date().toISOString() + }; + + const result = await runTrackedJob( + job, + () => + Promise.resolve({ + exitStatus: 0, + threadId: "thread-1", + turnId: "turn-1", + payload: { ok: true }, + rendered: "ok", + summary: "summary" + }), + { timeoutMs: 5_000 } + ); + + assert.equal(result.exitStatus, 0); + const jobs = listJobs(workspace); + const stored = jobs.find((entry) => entry.id === "task-ok-1"); + assert.equal(stored.status, "completed"); + assert.equal(stored.phase, "done"); + assert.equal(stored.pid, null); +});