Skip to content

Commit 07c210b

Browse files
audichuangclaude
andcommitted
fix: add captureTurn transport watchdog and runTrackedJob hard timeout
Completes the defense-in-depth strategy for stuck jobs. The plugin's job state machine is single-writer: only runTrackedJob writes terminal status. But that writer sits behind a chain of async promises (transport -> captureTurn -> runner), and any link in that chain can break silently, freezing state.json at status:"running" forever. Previous commits handled the read side (dead-PID reconciliation with TOCTOU guards) and UX transparency (idle surfacing, relative log timestamps). This commit closes the write side with two additional layers: Layer 1 — captureTurn transport watchdog (lib/codex.mjs): state.completion resolves only via turn/completed or an inferred 250ms final_answer timer. If the app-server (direct or via broker) disconnects before either signal, the promise hangs forever. Race it against client.exitPromise: on transport close, treat a seen final_answer as inferred success, otherwise reject with a clear "app-server disconnected before the turn completed" error. This covers the captureTurn-hang failure mode reported in the wild where codex CLI exits cleanly (exit 0) without sending turn/completed and the companion waits forever. Layer 2 — runTrackedJob hard timeout (lib/tracked-jobs.mjs): Even if every inner watchdog fails, Promise.race the runner against a configurable hard cap (default 15m, overridable via CODEX_JOB_TIMEOUT_MS env or per-call timeoutMs). On timeout, write status:"failed" with timedOut:true and a descriptive error. The job record also carries timeoutAt so /codex:status can render "hard timeout in 59s" when a job is also flagged staleLog — telling the user the escape hatch is imminent instead of leaving them guessing how long to wait. Combined coverage: - worker process dies silently -> dead-PID reconciliation - app-server disconnects mid-turn -> captureTurn watchdog - runner hangs in any other way -> hard timeout - user can always see what happened -> UX layer Refs upstream openai#243, openai#184, openai#222, openai#183, openai#164. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6847dab commit 07c210b

4 files changed

Lines changed: 99 additions & 12 deletions

File tree

plugins/codex/scripts/lib/codex.mjs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,31 @@ async function captureTurn(client, threadId, startRequest, options = {}) {
597597
completeTurn(state, response.turn);
598598
}
599599

600+
// Transport-level watchdog: if the app-server (direct or via broker)
601+
// disconnects before we see turn/completed or a final_answer-phase
602+
// agentMessage, state.completion will never resolve on its own. Race
603+
// against client.exitPromise so the companion always reaches a terminal
604+
// state. This avoids the "captureTurn hangs indefinitely" failure mode
605+
// that leaves a job stuck at status:running even though no worker exists.
606+
const transportWatchdog = client.exitPromise.then(() => {
607+
if (state.completed) return;
608+
if (state.finalAnswerSeen) {
609+
// final_answer was seen; the 250ms inferred-completion timer may or
610+
// may not have fired, and the transport closed before turn/completed.
611+
// Treat as inferred success — this is the same fallback the existing
612+
// scheduleInferredCompletion path uses.
613+
completeTurn(state, null, { inferred: true });
614+
return;
615+
}
616+
const exitError =
617+
client.exitError ?? state.error ??
618+
new Error("Codex app-server disconnected before the turn completed (no turn/completed or final_answer received).");
619+
state.error = state.error ?? exitError;
620+
state.rejectCompletion(exitError);
621+
});
622+
// Swallow unhandled-rejection noise; we only care that the watchdog fires.
623+
transportWatchdog.catch?.(() => {});
624+
600625
return await state.completion;
601626
} finally {
602627
clearCompletionTimer(state);

plugins/codex/scripts/lib/job-control.mjs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ export function enrichJob(job, options = {}) {
203203
const isActive = job.status === "queued" || job.status === "running";
204204
const idle = isActive ? computeIdleInfo(job.logFile) : null;
205205

206+
const timeoutAt = typeof job.timeoutAt === "string" ? Date.parse(job.timeoutAt) : null;
207+
const timeoutRemainingMs =
208+
isActive && Number.isFinite(timeoutAt) ? Math.max(0, timeoutAt - Date.now()) : null;
209+
206210
const enriched = {
207211
...job,
208212
kindLabel: getJobTypeLabel(job),
@@ -218,7 +222,10 @@ export function enrichJob(job, options = {}) {
218222
lastActivityAt: idle?.lastActivityAt ?? null,
219223
idleForMs: idle?.idleForMs ?? null,
220224
idleFor: idle?.idleFor ?? null,
221-
staleLog: Boolean(idle && idle.idleForMs > STALE_LOG_THRESHOLD_MS)
225+
staleLog: Boolean(idle && idle.idleForMs > STALE_LOG_THRESHOLD_MS),
226+
timeoutRemainingMs,
227+
timeoutRemaining:
228+
timeoutRemainingMs == null ? null : formatRelativeAgo(timeoutRemainingMs)?.replace(/ ago$/, "")
222229
};
223230

224231
return {

plugins/codex/scripts/lib/render.mjs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ function pushJobDetails(lines, job, options = {}) {
126126
if (job.autoReconciled) {
127127
const pidSuffix = job.reconciledDeadPid ? ` (PID ${job.reconciledDeadPid})` : "";
128128
lines.push(` ! Auto-reconciled as failed: worker process${pidSuffix} exited without reporting.`);
129+
} else if (job.timedOut) {
130+
lines.push(` ! Hard timeout: runner watchdog aborted the job after exceeding the configured duration.`);
131+
if (job.errorMessage) {
132+
lines.push(` Error: ${job.errorMessage}`);
133+
}
129134
} else if (job.errorMessage && (job.status === "failed" || job.status === "cancelled")) {
130135
lines.push(` Error: ${job.errorMessage}`);
131136
}
@@ -140,7 +145,10 @@ function pushJobDetails(lines, job, options = {}) {
140145
}
141146
if (options.showElapsed && job.idleFor) {
142147
const marker = job.staleLog ? " ! " : " ";
143-
lines.push(`${marker}Last activity: ${job.idleFor}${job.staleLog ? " — no log output; process may be stuck" : ""}`);
148+
const timeoutHint =
149+
job.staleLog && job.timeoutRemaining ? ` (hard timeout in ${job.timeoutRemaining})` : "";
150+
const stuckHint = job.staleLog ? " — no log output; process may be stuck" : "";
151+
lines.push(`${marker}Last activity: ${job.idleFor}${stuckHint}${timeoutHint}`);
144152
}
145153
if (options.showDuration && job.duration) {
146154
lines.push(` Duration: ${job.duration}`);

plugins/codex/scripts/lib/tracked-jobs.mjs

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,30 @@ import process from "node:process";
44
import { readJobFile, resolveJobFile, resolveJobLogFile, upsertJob, writeJobFile } from "./state.mjs";
55

66
export const SESSION_ID_ENV = "CODEX_COMPANION_SESSION_ID";
7+
export const JOB_TIMEOUT_ENV = "CODEX_JOB_TIMEOUT_MS";
8+
export const DEFAULT_JOB_TIMEOUT_MS = 15 * 60 * 1000;
79

810
export function nowIso() {
911
return new Date().toISOString();
1012
}
1113

14+
function resolveJobTimeoutMs(options) {
15+
if (Number.isFinite(options.timeoutMs) && options.timeoutMs > 0) {
16+
return options.timeoutMs;
17+
}
18+
const envValue = Number(process.env[JOB_TIMEOUT_ENV]);
19+
if (Number.isFinite(envValue) && envValue > 0) {
20+
return envValue;
21+
}
22+
return DEFAULT_JOB_TIMEOUT_MS;
23+
}
24+
25+
function formatTimeoutHuman(ms) {
26+
if (ms >= 60_000) return `${Math.round(ms / 60_000)}m`;
27+
if (ms >= 1000) return `${Math.round(ms / 1000)}s`;
28+
return `${ms}ms`;
29+
}
30+
1231
function normalizeProgressEvent(value) {
1332
if (value && typeof value === "object" && !Array.isArray(value)) {
1433
return {
@@ -140,19 +159,41 @@ function readStoredJobOrNull(workspaceRoot, jobId) {
140159
}
141160

142161
export async function runTrackedJob(job, runner, options = {}) {
162+
const timeoutMs = resolveJobTimeoutMs(options);
143163
const runningRecord = {
144164
...job,
145165
status: "running",
146166
startedAt: nowIso(),
147167
phase: "starting",
148168
pid: process.pid,
149-
logFile: options.logFile ?? job.logFile ?? null
169+
logFile: options.logFile ?? job.logFile ?? null,
170+
timeoutAt: new Date(Date.now() + timeoutMs).toISOString(),
171+
timeoutMs
150172
};
151173
writeJobFile(job.workspaceRoot, job.id, runningRecord);
152174
upsertJob(job.workspaceRoot, runningRecord);
153175

176+
// Layer-2 watchdog: no matter what goes wrong inside runner (captureTurn
177+
// hang, broker wedge, internal deadlock), this timer guarantees the job
178+
// reaches a terminal state. Layer 1 (captureTurn exitPromise watchdog) and
179+
// layer 3 (listJobs dead-PID reconciliation) catch most cases — this is
180+
// the backstop for the rest.
181+
let timeoutHandle = null;
182+
let timedOut = false;
183+
const timeoutPromise = new Promise((_resolve, reject) => {
184+
timeoutHandle = setTimeout(() => {
185+
timedOut = true;
186+
reject(new Error(`Tracked job ${job.id} exceeded the ${formatTimeoutHuman(timeoutMs)} hard timeout and was aborted by the runner watchdog.`));
187+
}, timeoutMs);
188+
timeoutHandle.unref?.();
189+
});
190+
154191
try {
155-
const execution = await runner();
192+
const execution = await Promise.race([runner(), timeoutPromise]);
193+
if (timeoutHandle) {
194+
clearTimeout(timeoutHandle);
195+
timeoutHandle = null;
196+
}
156197
const completionStatus = execution.exitStatus === 0 ? "completed" : "failed";
157198
const completedAt = nowIso();
158199
writeJobFile(job.workspaceRoot, job.id, {
@@ -179,25 +220,31 @@ export async function runTrackedJob(job, runner, options = {}) {
179220
appendLogBlock(options.logFile ?? job.logFile ?? null, "Final output", execution.rendered);
180221
return execution;
181222
} catch (error) {
223+
if (timeoutHandle) {
224+
clearTimeout(timeoutHandle);
225+
timeoutHandle = null;
226+
}
182227
const errorMessage = error instanceof Error ? error.message : String(error);
183228
const existing = readStoredJobOrNull(job.workspaceRoot, job.id) ?? runningRecord;
184229
const completedAt = nowIso();
185-
writeJobFile(job.workspaceRoot, job.id, {
186-
...existing,
230+
const failurePatch = {
187231
status: "failed",
188232
phase: "failed",
189233
errorMessage,
190234
pid: null,
191-
completedAt,
235+
completedAt
236+
};
237+
if (timedOut) {
238+
failurePatch.timedOut = true;
239+
}
240+
writeJobFile(job.workspaceRoot, job.id, {
241+
...existing,
242+
...failurePatch,
192243
logFile: options.logFile ?? job.logFile ?? existing.logFile ?? null
193244
});
194245
upsertJob(job.workspaceRoot, {
195246
id: job.id,
196-
status: "failed",
197-
phase: "failed",
198-
pid: null,
199-
errorMessage,
200-
completedAt
247+
...failurePatch
201248
});
202249
throw error;
203250
}

0 commit comments

Comments
 (0)