Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions plugins/codex/scripts/app-server-broker.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ async function main() {
let activeStreamSocket = null;
let activeStreamThreadIds = null;
const sockets = new Set();
let shuttingDown = false;

function clearSocketOwnership(socket) {
if (activeRequestSocket === socket) {
Expand Down Expand Up @@ -100,7 +101,25 @@ async function main() {
}

async function shutdown(server) {
if (shuttingDown) {
return;
}
shuttingDown = true;
// Notify any client whose request is still in flight that the broker is
// going down. Without this they would hang on a half-open socket waiting
// for a response that will never arrive.
const fanoutMessage = {
method: "notifications/broker/shuttingDown",
params: {
reason: appClient.exitError ? String(appClient.exitError.message ?? appClient.exitError) : "broker shutdown"
}
};
for (const socket of sockets) {
try {
send(socket, fanoutMessage);
} catch {
// Best-effort fanout. Continue tearing down.
}
socket.end();
}
await appClient.close().catch(() => {});
Expand All @@ -115,6 +134,21 @@ async function main() {

appClient.setNotificationHandler(routeNotification);

// If the underlying codex CLI app-server dies (crashes, OOM, exits), tear
// down the broker rather than zombify with a dead client. The next companion
// process will detect a dead endpoint via ensureBrokerSession and respawn.
// See: openai/codex-plugin-cc#183.
appClient.exitPromise.then(() => {
if (shuttingDown) {
return;
}
const detail = appClient.exitError instanceof Error ? appClient.exitError.message : String(appClient.exitError ?? "unknown");
process.stderr.write(`[broker] underlying codex app-server exited: ${detail}\n`);
shutdown(server)
.catch(() => {})
.finally(() => process.exit(1));
});

const server = net.createServer((socket) => {
sockets.add(socket);
socket.setEncoding("utf8");
Expand Down
32 changes: 31 additions & 1 deletion plugins/codex/scripts/lib/codex.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,37 @@ async function captureTurn(client, threadId, startRequest, options = {}) {
completeTurn(state, response.turn);
}

return await state.completion;
// Race the turn completion against the client's exit promise so that if
// the underlying codex app-server (or broker socket) dies mid-turn, we
// reject instead of hanging forever waiting for a terminal event that
// will never arrive. See: openai/codex-plugin-cc#183.
const exitWatch = client.exitPromise.then(() => {
if (state.completed) {
return;
}
const reason =
client.exitError instanceof Error
? client.exitError
: new Error(
client.exitError
? String(client.exitError)
: "codex app-server connection closed before the turn completed."
);
emitProgress(
state.onProgress,
`Codex app-server disconnected mid-turn: ${reason.message}`,
"failed"
);
state.rejectCompletion(reason);
});

try {
return await state.completion;
} finally {
// Detach the exit watcher so its rejection (if any) does not surface
// as an unhandled rejection after we have already settled the turn.
exitWatch.catch(() => {});
}
} finally {
clearCompletionTimer(state);
client.setNotificationHandler(previousHandler ?? null);
Expand Down
113 changes: 107 additions & 6 deletions plugins/codex/scripts/lib/job-control.mjs
Original file line number Diff line number Diff line change
@@ -1,13 +1,109 @@
import fs from "node:fs";

import { getSessionRuntimeStatus } from "./codex.mjs";
import { getConfig, listJobs, readJobFile, resolveJobFile } from "./state.mjs";
import { getConfig, listJobs, readJobFile, resolveJobFile, upsertJob, writeJobFile } from "./state.mjs";
import { isProcessAlive } from "./process.mjs";
import { SESSION_ID_ENV } from "./tracked-jobs.mjs";
import { resolveWorkspaceRoot } from "./workspace.mjs";

export const DEFAULT_MAX_STATUS_JOBS = 8;
export const DEFAULT_MAX_PROGRESS_LINES = 4;

function isActiveJobStatus(status) {
return status === "queued" || status === "running";
}

/**
* Marks a job as failed when its tracked process has died unexpectedly.
* Re-reads the latest persisted state from disk before writing to guard
* against races where the job completes legitimately at the same moment.
*
* @param {string} workspaceRoot
* @param {string} jobId
* @param {number} pid - The PID we observed as dead
* @returns {boolean} true if the job was reconciled, false if skipped
*/
export function markDeadPidJobFailed(workspaceRoot, jobId, pid) {
const jobFile = resolveJobFile(workspaceRoot, jobId);

let latestJob;
try {
latestJob = readJobFile(jobFile);
} catch {
return false;
}

// Only overwrite active states; never downgrade completed/failed/cancelled.
if (!isActiveJobStatus(latestJob.status)) {
return false;
}

// Only overwrite if the PID still matches what we observed as dead. This
// guards against a job that legitimately restarted with a new PID between
// our liveness probe and the write.
if (latestJob.pid !== pid) {
return false;
}

const completedAt = new Date().toISOString();
const errorMessage = `Tracked process PID ${pid} exited unexpectedly without writing a terminal status.`;

const failedPatch = {
status: "failed",
phase: "failed",
pid: null,
errorMessage,
completedAt
};

writeJobFile(workspaceRoot, jobId, {
...latestJob,
...failedPatch
});

upsertJob(workspaceRoot, {
id: jobId,
...failedPatch
});

return true;
}

/**
* If a job is still marked active but its tracked PID is dead, reconcile it
* to failed and return the updated record. Otherwise return the original.
*
* Called from every status read path so a single status query is enough to
* surface dead workers - no need to wait for a polling watcher.
*
* @param {string} workspaceRoot
* @param {object} job
* @returns {object}
*/
function reconcileIfDead(workspaceRoot, job) {
if (!job || !isActiveJobStatus(job.status)) {
return job;
}
const pid = Number.isFinite(job.pid) ? job.pid : null;
if (pid === null) {
return job;
}
if (isProcessAlive(pid)) {
return job;
}

try {
markDeadPidJobFailed(workspaceRoot, job.id, pid);
} catch {
// Never let reconciliation errors crash a status read.
return job;
}

// Re-read so the caller sees the reconciled fields.
const refreshed = readStoredJob(workspaceRoot, job.id);
return refreshed ? { ...job, ...refreshed } : job;
}

export function sortJobsNewestFirst(jobs) {
return [...jobs].sort((left, right) => String(right.updatedAt ?? "").localeCompare(String(left.updatedAt ?? "")));
}
Expand Down Expand Up @@ -213,19 +309,22 @@ function matchJobReference(jobs, reference, predicate = () => true) {
export function buildStatusSnapshot(cwd, options = {}) {
const workspaceRoot = resolveWorkspaceRoot(cwd);
const config = getConfig(workspaceRoot);
const jobs = sortJobsNewestFirst(filterJobsForCurrentSession(listJobs(workspaceRoot), options));
const rawJobs = sortJobsNewestFirst(filterJobsForCurrentSession(listJobs(workspaceRoot), options));
// Reconcile any active jobs whose tracked PID is dead before partitioning,
// so a single status read surfaces stuck workers immediately.
const jobs = rawJobs.map((job) => reconcileIfDead(workspaceRoot, job));
Comment on lines +312 to +315
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Re-sort jobs after dead-PID reconciliation

buildStatusSnapshot sorts once (rawJobs) and then mutates job states via reconcileIfDead, but it never re-sorts before computing latestFinished/recent. If a stale running job near the end of the list is reconciled to failed, it keeps its old position and can be dropped by the maxJobs slice, so a single /codex:status call may reconcile a dead worker without showing it anywhere in the output. I reproduced this with >8 newer completed jobs: the reconciled dead job disappeared from both running and recent despite being newly failed.

Useful? React with 👍 / 👎.

const maxJobs = options.maxJobs ?? DEFAULT_MAX_STATUS_JOBS;
const maxProgressLines = options.maxProgressLines ?? DEFAULT_MAX_PROGRESS_LINES;

const running = jobs
.filter((job) => job.status === "queued" || job.status === "running")
.filter((job) => isActiveJobStatus(job.status))
.map((job) => enrichJob(job, { maxProgressLines }));

const latestFinishedRaw = jobs.find((job) => job.status !== "queued" && job.status !== "running") ?? null;
const latestFinishedRaw = jobs.find((job) => !isActiveJobStatus(job.status)) ?? null;
const latestFinished = latestFinishedRaw ? enrichJob(latestFinishedRaw, { maxProgressLines }) : null;

const recent = (options.all ? jobs : jobs.slice(0, maxJobs))
.filter((job) => job.status !== "queued" && job.status !== "running" && job.id !== latestFinished?.id)
.filter((job) => !isActiveJobStatus(job.status) && job.id !== latestFinished?.id)
.map((job) => enrichJob(job, { maxProgressLines }));

return {
Expand All @@ -247,9 +346,11 @@ export function buildSingleJobSnapshot(cwd, reference, options = {}) {
throw new Error(`No job found for "${reference}". Run /codex:status to inspect known jobs.`);
}

const reconciled = reconcileIfDead(workspaceRoot, selected);

return {
workspaceRoot,
job: enrichJob(selected, { maxProgressLines: options.maxProgressLines })
job: enrichJob(reconciled, { maxProgressLines: options.maxProgressLines })
};
}

Expand Down
20 changes: 20 additions & 0 deletions plugins/codex/scripts/lib/process.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,26 @@ function looksLikeMissingProcessMessage(text) {
return /not found|no running instance|cannot find|does not exist|no such process/i.test(text);
}

/**
* Checks whether a process with the given PID is still running.
* Uses signal 0 which does not affect the process - it only checks existence.
*
* @param {number | null | undefined} pid
* @returns {boolean}
*/
export function isProcessAlive(pid) {
if (pid == null || !Number.isFinite(pid) || pid <= 0) {
return false;
}
try {
process.kill(pid, 0);
return true;
} catch (error) {
// ESRCH = no such process (dead). EPERM = exists but no signal permission.
return error?.code === "EPERM";
}
}

export function terminateProcessTree(pid, options = {}) {
if (!Number.isFinite(pid)) {
return { attempted: false, delivered: false, method: null };
Expand Down
44 changes: 43 additions & 1 deletion plugins/codex/scripts/lib/tracked-jobs.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@ import { readJobFile, resolveJobFile, resolveJobLogFile, upsertJob, writeJobFile

export const SESSION_ID_ENV = "CODEX_COMPANION_SESSION_ID";

// Hard ceiling for any single tracked job. Default 30 minutes is generous
// enough for long Codex turns but bounded so a hung captureTurn cannot keep
// the companion process alive forever. Override via CODEX_COMPANION_JOB_TIMEOUT_MS.
const DEFAULT_JOB_TIMEOUT_MS = 30 * 60 * 1000;

function resolveJobTimeoutMs(options = {}) {
if (Number.isFinite(options.timeoutMs) && options.timeoutMs > 0) {
return options.timeoutMs;
}
const fromEnv = Number(process.env.CODEX_COMPANION_JOB_TIMEOUT_MS);
if (Number.isFinite(fromEnv) && fromEnv > 0) {
return fromEnv;
}
return DEFAULT_JOB_TIMEOUT_MS;
}

export function nowIso() {
return new Date().toISOString();
}
Expand Down Expand Up @@ -151,8 +167,30 @@ export async function runTrackedJob(job, runner, options = {}) {
writeJobFile(job.workspaceRoot, job.id, runningRecord);
upsertJob(job.workspaceRoot, runningRecord);

// Race the runner against a hard timeout. If captureTurn or any other
// long-poll inside the runner hangs (e.g. underlying app-server died and
// never produced a terminal event), this guarantees the companion exits
// and the job transitions to a terminal status. See: openai/codex-plugin-cc#183.
const timeoutMs = resolveJobTimeoutMs(options);
let timeoutHandle = null;
const timeoutPromise = new Promise((_resolve, reject) => {
timeoutHandle = setTimeout(() => {
reject(
new Error(
`Tracked job ${job.id} exceeded the ${Math.round(timeoutMs / 1000)}s hard timeout. ` +
"The runner did not produce a terminal status. Set CODEX_COMPANION_JOB_TIMEOUT_MS to adjust."
)
);
}, timeoutMs);
timeoutHandle.unref?.();
});

try {
const execution = await runner();
const execution = await Promise.race([runner(), timeoutPromise]);
if (timeoutHandle) {
clearTimeout(timeoutHandle);
timeoutHandle = null;
}
const completionStatus = execution.exitStatus === 0 ? "completed" : "failed";
const completedAt = nowIso();
writeJobFile(job.workspaceRoot, job.id, {
Expand All @@ -179,6 +217,10 @@ export async function runTrackedJob(job, runner, options = {}) {
appendLogBlock(options.logFile ?? job.logFile ?? null, "Final output", execution.rendered);
return execution;
} catch (error) {
if (timeoutHandle) {
clearTimeout(timeoutHandle);
timeoutHandle = null;
}
const errorMessage = error instanceof Error ? error.message : String(error);
const existing = readStoredJobOrNull(job.workspaceRoot, job.id) ?? runningRecord;
const completedAt = nowIso();
Expand Down
Loading