Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 37 additions & 23 deletions plugins/codex/scripts/codex-companion.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
} from "./lib/codex.mjs";
import { readStdinIfPiped } from "./lib/fs.mjs";
import { collectReviewContext, ensureGitRepository, resolveReviewTarget } from "./lib/git.mjs";
import { binaryAvailable, terminateProcessTree } from "./lib/process.mjs";
import { binaryAvailable, isProcessAlive, terminateProcessTree } from "./lib/process.mjs";
import { loadPromptTemplate, interpolateTemplate } from "./lib/prompts.mjs";
import {
generateJobId,
Expand All @@ -35,6 +35,7 @@ import {
import {
buildSingleJobSnapshot,
buildStatusSnapshot,
markDeadPidJobFailed,
readStoredJob,
resolveCancelableJob,
resolveResultJob,
Expand Down Expand Up @@ -288,39 +289,45 @@ function isActiveJobStatus(status) {
return status === "queued" || status === "running";
}

function getCurrentClaudeSessionId() {
return process.env[SESSION_ID_ENV] ?? null;
function normalizeTrackedPid(pid) {
const numeric = Number(pid);
return Number.isFinite(numeric) && numeric > 0 ? numeric : null;
}
Comment on lines +292 to 295
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reintroduce session-scoping task helper functions

This hunk replaces the block that previously defined getCurrentClaudeSessionId, filterJobsForCurrentClaudeSession, and findLatestResumableTaskJob, but call sites for those names still exist in resolveLatestTrackedTaskThread and handleTaskResumeCandidate. In this commit, invoking task-resume-candidate (for example node plugins/codex/scripts/codex-companion.mjs task-resume-candidate --json) now fails with ReferenceError: getCurrentClaudeSessionId is not defined, and resume-last task flows crash for the same reason. Please restore these helpers (or update the callers to a replacement API) so resume commands remain usable.

Useful? React with 👍 / 👎.


function filterJobsForCurrentClaudeSession(jobs) {
const sessionId = getCurrentClaudeSessionId();
if (!sessionId) {
return jobs;
function reconcileDeadPidDuringWait(cwd, reference, snapshot) {
const trackedPid = normalizeTrackedPid(snapshot.job.pid);
if (!isActiveJobStatus(snapshot.job.status) || trackedPid == null || isProcessAlive(trackedPid)) {
return snapshot;
}
return jobs.filter((job) => job.sessionId === sessionId);
}

function findLatestResumableTaskJob(jobs) {
return (
jobs.find(
(job) =>
job.jobClass === "task" &&
job.threadId &&
job.status !== "queued" &&
job.status !== "running"
) ?? null
);
try {
const didFail = markDeadPidJobFailed(snapshot.workspaceRoot, snapshot.job.id, trackedPid);
if (!didFail) {
return buildSingleJobSnapshot(cwd, reference);
}
} catch (error) {
// Never let reconciliation errors crash the poll loop.
appendLogLine(
snapshot.job.logFile ?? null,
`Dead-PID reconciliation skipped due to unexpected error: ${error instanceof Error ? error.message : String(error)}`
);
return buildSingleJobSnapshot(cwd, reference);
}
return buildSingleJobSnapshot(cwd, reference);
}

async function waitForSingleJobSnapshot(cwd, reference, options = {}) {
const timeoutMs = Math.max(0, Number(options.timeoutMs) || DEFAULT_STATUS_WAIT_TIMEOUT_MS);
const pollIntervalMs = Math.max(100, Number(options.pollIntervalMs) || DEFAULT_STATUS_POLL_INTERVAL_MS);
const deadline = Date.now() + timeoutMs;
let snapshot = buildSingleJobSnapshot(cwd, reference);
let snapshot = reconcileDeadPidDuringWait(cwd, reference, buildSingleJobSnapshot(cwd, reference));

while (isActiveJobStatus(snapshot.job.status) && Date.now() < deadline) {
await sleep(Math.min(pollIntervalMs, Math.max(0, deadline - Date.now())));
snapshot = buildSingleJobSnapshot(cwd, reference);
snapshot = reconcileDeadPidDuringWait(cwd, reference, buildSingleJobSnapshot(cwd, reference));
}

if (isActiveJobStatus(snapshot.job.status)) {
snapshot = reconcileDeadPidDuringWait(cwd, reference, snapshot);
}

return {
Expand Down Expand Up @@ -852,7 +859,14 @@ async function handleStatus(argv) {
pollIntervalMs: options["poll-interval-ms"]
})
: buildSingleJobSnapshot(cwd, reference);
outputCommandResult(snapshot, renderJobStatusReport(snapshot.job), options.json);
outputCommandResult(
snapshot,
renderJobStatusReport(snapshot.job, {
waitTimedOut: Boolean(snapshot.waitTimedOut),
timeoutMs: snapshot.timeoutMs ?? null
}),
options.json
);
return;
}

Expand Down
189 changes: 176 additions & 13 deletions plugins/codex/scripts/lib/job-control.mjs
Original file line number Diff line number Diff line change
@@ -1,13 +1,159 @@
import fs from "node:fs";

import { getSessionRuntimeStatus } from "./codex.mjs";
import { getConfig, listJobs, readJobFile, resolveJobFile } from "./state.mjs";
import { SESSION_ID_ENV } from "./tracked-jobs.mjs";
import { isProcessAlive } from "./process.mjs";
import { getConfig, listJobs, readJobFile, resolveJobFile, upsertJob, writeJobFile } from "./state.mjs";
import { appendLogLine, SESSION_ID_ENV } from "./tracked-jobs.mjs";
import { resolveWorkspaceRoot } from "./workspace.mjs";

export const DEFAULT_MAX_STATUS_JOBS = 8;
export const DEFAULT_MAX_PROGRESS_LINES = 4;

function isActiveJob(job) {
return job.status === "queued" || job.status === "running";
}

function normalizeTrackedPid(pid) {
const numeric = Number(pid);
return Number.isFinite(numeric) && numeric > 0 ? numeric : null;
}

function normalizeNullable(value) {
return value ?? null;
}

function buildIndexSyncPatch(indexJob, sourceJob) {
return {
id: indexJob.id,
status: sourceJob.status ?? indexJob.status ?? null,
phase: sourceJob.phase ?? null,
pid: Number.isFinite(sourceJob.pid) ? sourceJob.pid : null,
completedAt: sourceJob.completedAt ?? null,
errorMessage: sourceJob.errorMessage ?? null,
threadId: sourceJob.threadId ?? null,
turnId: sourceJob.turnId ?? null,
summary: sourceJob.summary ?? indexJob.summary ?? null
};
}

function indexNeedsSync(indexJob, patch) {
return (
normalizeNullable(indexJob.status) !== normalizeNullable(patch.status) ||
normalizeNullable(indexJob.phase) !== normalizeNullable(patch.phase) ||
normalizeNullable(Number.isFinite(indexJob.pid) ? indexJob.pid : null) !== normalizeNullable(patch.pid) ||
normalizeNullable(indexJob.completedAt) !== normalizeNullable(patch.completedAt) ||
normalizeNullable(indexJob.errorMessage) !== normalizeNullable(patch.errorMessage) ||
normalizeNullable(indexJob.threadId) !== normalizeNullable(patch.threadId) ||
normalizeNullable(indexJob.turnId) !== normalizeNullable(patch.turnId) ||
normalizeNullable(indexJob.summary) !== normalizeNullable(patch.summary)
);
}

/**
* Marks a job as failed when its tracked process has died unexpectedly.
* Re-reads the latest persisted state from disk before writing to guard
* against races where the job completes legitimately at the same time.
* @param {string} workspaceRoot
* @param {string} jobId
* @param {number} pid - The PID we observed as dead
* @returns {boolean} true if the job was marked failed, false if skipped
*/
export function markDeadPidJobFailed(workspaceRoot, jobId, pid) {
const observedPid = normalizeTrackedPid(pid);
if (observedPid == null) {
return false;
}
const jobFile = resolveJobFile(workspaceRoot, jobId);

// Re-read the latest persisted state from disk (not from memory)
let latestJob;
try {
latestJob = readJobFile(jobFile);
} catch {
return false;
}

// Guard 1: only overwrite active states - never downgrade completed/failed
if (latestJob.status !== "queued" && latestJob.status !== "running") {
return false;
}

// Guard 2: only overwrite if the PID still matches what we observed as dead
// This prevents overwriting a job that restarted with a new PID
if (normalizeTrackedPid(latestJob.pid) !== observedPid) {
return false;
}

const completedAt = new Date().toISOString();
const errorMessage = `Process PID ${observedPid} exited unexpectedly`;

const failedPatch = {
status: "failed",
phase: "failed",
pid: null,
errorMessage,
completedAt
};

// Persist to per-job file
writeJobFile(workspaceRoot, jobId, {
...latestJob,
...failedPatch
});
appendLogLine(latestJob.logFile ?? null, `Failed: ${errorMessage}`);

// Persist to state index
upsertJob(workspaceRoot, {
id: jobId,
...failedPatch
});

return true;
}

function reconcileDeadPidJob(workspaceRoot, job) {
const trackedPid = normalizeTrackedPid(job.pid);
if (!isActiveJob(job) || trackedPid == null) {
return job;
}

if (isProcessAlive(trackedPid)) {
return job;
}

const didFail = markDeadPidJobFailed(workspaceRoot, job.id, trackedPid);
try {
const storedJob = readJobFile(resolveJobFile(workspaceRoot, job.id));
if (didFail) {
return storedJob;
}
if (!isActiveJob(storedJob)) {
upsertJob(workspaceRoot, {
id: job.id,
status: storedJob.status ?? null,
phase: storedJob.phase ?? null,
pid: Number.isFinite(storedJob.pid) ? storedJob.pid : null,
completedAt: storedJob.completedAt ?? null,
errorMessage: storedJob.errorMessage ?? null,
threadId: storedJob.threadId ?? null,
turnId: storedJob.turnId ?? null,
summary: storedJob.summary ?? job.summary ?? null
});
return {
...job,
...storedJob
};
}
return job;
} catch {
return job;
}
}

function reconcileDeadPidJobs(workspaceRoot, jobs) {
return jobs.map((job) => reconcileDeadPidJob(workspaceRoot, job));
}

export function sortJobsNewestFirst(jobs) {
return [...jobs].sort((left, right) => String(right.updatedAt ?? "").localeCompare(String(left.updatedAt ?? "")));
}
Expand Down Expand Up @@ -213,19 +359,19 @@ function matchJobReference(jobs, reference, predicate = () => true) {
export function buildStatusSnapshot(cwd, options = {}) {
const workspaceRoot = resolveWorkspaceRoot(cwd);
const config = getConfig(workspaceRoot);
const jobs = sortJobsNewestFirst(filterJobsForCurrentSession(listJobs(workspaceRoot), options));
const jobs = sortJobsNewestFirst(reconcileDeadPidJobs(workspaceRoot, filterJobsForCurrentSession(listJobs(workspaceRoot), options)));
const maxJobs = options.maxJobs ?? DEFAULT_MAX_STATUS_JOBS;
const maxProgressLines = options.maxProgressLines ?? DEFAULT_MAX_PROGRESS_LINES;

const running = jobs
.filter((job) => job.status === "queued" || job.status === "running")
.filter((job) => isActiveJob(job))
.map((job) => enrichJob(job, { maxProgressLines }));

const latestFinishedRaw = jobs.find((job) => job.status !== "queued" && job.status !== "running") ?? null;
const latestFinishedRaw = jobs.find((job) => !isActiveJob(job)) ?? null;
const latestFinished = latestFinishedRaw ? enrichJob(latestFinishedRaw, { maxProgressLines }) : null;

const recent = (options.all ? jobs : jobs.slice(0, maxJobs))
.filter((job) => job.status !== "queued" && job.status !== "running" && job.id !== latestFinished?.id)
.filter((job) => !isActiveJob(job) && job.id !== latestFinished?.id)
.map((job) => enrichJob(job, { maxProgressLines }));

return {
Expand All @@ -241,32 +387,49 @@ export function buildStatusSnapshot(cwd, options = {}) {

export function buildSingleJobSnapshot(cwd, reference, options = {}) {
const workspaceRoot = resolveWorkspaceRoot(cwd);
const jobs = sortJobsNewestFirst(listJobs(workspaceRoot));
const jobs = sortJobsNewestFirst(reconcileDeadPidJobs(workspaceRoot, listJobs(workspaceRoot)));
const selected = matchJobReference(jobs, reference);
if (!selected) {
throw new Error(`No job found for "${reference}". Run /codex:status to inspect known jobs.`);
}

let storedJob = null;
try {
storedJob = readStoredJob(workspaceRoot, selected.id);
} catch {
storedJob = null;
}

const latest = storedJob ? { ...selected, ...storedJob } : selected;
if (storedJob) {
const indexPatch = buildIndexSyncPatch(selected, latest);
if (indexNeedsSync(selected, indexPatch)) {
upsertJob(workspaceRoot, indexPatch);
}
}

return {
workspaceRoot,
job: enrichJob(selected, { maxProgressLines: options.maxProgressLines })
job: enrichJob(latest, { maxProgressLines: options.maxProgressLines })
};
}

export function resolveResultJob(cwd, reference) {
const workspaceRoot = resolveWorkspaceRoot(cwd);
const jobs = sortJobsNewestFirst(reference ? listJobs(workspaceRoot) : filterJobsForCurrentSession(listJobs(workspaceRoot)));
const jobs = sortJobsNewestFirst(
reconcileDeadPidJobs(workspaceRoot, reference ? listJobs(workspaceRoot) : filterJobsForCurrentSession(listJobs(workspaceRoot)))
);
const selected = matchJobReference(
jobs,
reference,
(job) => job.status === "completed" || job.status === "failed" || job.status === "cancelled"
(job) => !isActiveJob(job)
);

if (selected) {
return { workspaceRoot, job: selected };
}

const active = matchJobReference(jobs, reference, (job) => job.status === "queued" || job.status === "running");
const active = matchJobReference(jobs, reference, (job) => isActiveJob(job));
if (active) {
throw new Error(`Job ${active.id} is still ${active.status}. Check /codex:status and try again once it finishes.`);
}
Expand All @@ -280,8 +443,8 @@ export function resolveResultJob(cwd, reference) {

export function resolveCancelableJob(cwd, reference, options = {}) {
const workspaceRoot = resolveWorkspaceRoot(cwd);
const jobs = sortJobsNewestFirst(listJobs(workspaceRoot));
const activeJobs = jobs.filter((job) => job.status === "queued" || job.status === "running");
const jobs = sortJobsNewestFirst(reconcileDeadPidJobs(workspaceRoot, listJobs(workspaceRoot)));
const activeJobs = jobs.filter((job) => isActiveJob(job));

if (reference) {
const selected = matchJobReference(activeJobs, reference);
Expand Down
20 changes: 20 additions & 0 deletions plugins/codex/scripts/lib/process.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,26 @@ function looksLikeMissingProcessMessage(text) {
return /not found|no running instance|cannot find|does not exist|no such process/i.test(text);
}

/**
* Checks whether a process with the given PID is still running.
* Uses signal 0 which does not kill the process - it only checks existence.
* @param {number | null | undefined} pid
* @returns {boolean}
*/
export function isProcessAlive(pid) {
if (pid == null || !Number.isFinite(pid) || pid <= 0) {
return false;
}
try {
process.kill(pid, 0);
return true;
} catch (error) {
// ESRCH = no such process (dead)
// EPERM = process exists but no permission to send signals (still alive)
return error?.code === "EPERM";
}
}

export function terminateProcessTree(pid, options = {}) {
if (!Number.isFinite(pid)) {
return { attempted: false, delivered: false, method: null };
Expand Down
Loading