From db4b45f96d443c139d420204e684f9a0afcea983 Mon Sep 17 00:00:00 2001 From: Allen091080 Date: Sun, 24 May 2026 13:43:48 +0800 Subject: [PATCH] ds4-server: watchdog thread + decode-loop SSE keepalive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The inference worker can stall in the underlying GPU/Metal kernel calls (`ds4_session_sample` / `ds4_session_eval` / `ds4_session_eval_speculative_argmax`). Those calls have no cancellation points, so SIGTERM cannot drain a stuck turn — we have seen 12+ hour ds4-server processes wedged with a single chat request in flight, requiring SIGKILL to recover. The SSE keepalive that landed earlier covers prefill but not decode, so once the worker gets past prefill and enters decode, a Metal stall is again invisible to the client until its own idle timeout. Add a dedicated `watchdog_main` thread that polls `worker_last_progress` while `worker_in_job` is set. Two thresholds: soft stall (default 60s) — set `worker_abort_requested` so the decode loop bails on its next iteration; the in-flight HTTP client gets an `error` finish_reason and the worker continues to pick up future jobs. hard stall (default 120s) — call `_exit(137)`. The launchd KeepAlive supervisor restarts us immediately; the on-disk KV cache survives so the restarted process can usually resume the client's last prompt prefix from cache instead of re-prefilling from token zero. The decode loop now: - checks `worker_abort_requested` at the top of each iteration and breaks with a clear `finish=error` reason when the watchdog has requested an abort; - emits a `: decode\n\n` SSE comment line every 15 seconds when `j->req.stream` is set, mirroring the prefill keepalive from the earlier patch so reasoning-only stretches (`...`) or slow tool-input phases don't trip client TCP idle-timeouts either; - refreshes `worker_last_progress` (relaxed atomic store) after each produced token so the watchdog can distinguish "alive but slow" from "wedged". `server_progress_cb` also refreshes `worker_last_progress` on every `prefill_chunk` event. Without this, very long prefills (≥ soft threshold for big prompts) would be mistaken for Metal stalls — chunks fire every ~7s in practice and reset the timestamp before the 60s soft limit, so prefill is never falsely aborted. Field layout: - `worker_tid` / `watchdog_tid` / `watchdog_running` / thresholds - 3 cross-thread atomics: `worker_last_progress`, `worker_in_job`, `worker_abort_requested` (relaxed __atomic_* loads/stores) The watchdog only acts while `worker_in_job` is set — idle workers don't advance progress and we must not interpret that as a stall. Setup-time misconfigurations (model load failure, port conflict) are not covered; they fail synchronously before any job is ever dequeued. Verified on macOS Metal, q2-imatrix GGUF, ctx=200000: - `make` clean build, no new warnings - `./ds4_test --server` passes - Real running ds4-server protected: in earlier 12+ hour run we saw a hard wedge that needed SIGKILL; with this patch a follow-up run hit the soft trigger ("WATCHDOG soft stall 62s >= 60s — requesting decode-loop abort") and the client got a clean error within 70s instead of hanging indefinitely. Co-Authored-By: Claude Opus 4.7 (1M context) --- ds4_server.c | 134 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/ds4_server.c b/ds4_server.c index 62fabc1b..104cdd0d 100644 --- a/ds4_server.c +++ b/ds4_server.c @@ -7702,6 +7702,23 @@ struct server { FILE *trace; pthread_mutex_t trace_mu; uint64_t trace_seq; + /* Watchdog: the inference worker can stall in the underlying GPU/Metal + * kernel calls in `ds4_session_eval*` / `ds4_session_sample`. Those + * calls have no cancellation points, so SIGTERM cannot drain a stuck + * turn. A dedicated watchdog thread polls `worker_last_progress` while + * `worker_in_job` is set: a soft stall flips `worker_abort_requested` + * so the decode loop bails on its next iteration, a hard stall calls + * `_exit(137)` so the launchd `KeepAlive` policy can restart us. */ + pthread_t worker_tid; + pthread_t watchdog_tid; + bool watchdog_running; + int watchdog_stuck_soft_s; + int watchdog_stuck_hard_s; + /* These three are accessed via __atomic_* across threads and must keep + * relaxed-atomic semantics — do not read/write them with plain `=`. */ + volatile long worker_last_progress; + volatile int worker_in_job; + volatile int worker_abort_requested; }; /* Jobs are stack-owned by the client thread. The worker signals completion @@ -9505,6 +9522,15 @@ static void server_progress_cb(void *ud, const char *event, int current, int tot server_prefill_progress *p = ud; if (!p || !event || strcmp(event, "prefill_chunk")) return; + /* Prefill chunks are real worker progress — refresh the watchdog + * timestamp so a long prefill (≥ soft threshold for big prompts) is + * not mistaken for a Metal stall. Without this, prompts over + * ~18-20k tokens get the decode loop aborted as soon as prefill + * finishes, because no token was produced for >60s. */ + if (p->srv) { + __atomic_store_n(&p->srv->worker_last_progress, (long)time(NULL), __ATOMIC_RELAXED); + } + double now = now_sec(); /* Keep the HTTP/SSE connection alive while prefill runs. We write the SSE * response headers the first time the callback fires and then emit a @@ -10240,6 +10266,11 @@ static void generate_job(server *s, job *j) { const double decode_t0 = now_sec(); double last_decode_log_t = decode_t0; int last_decode_log_completion = 0; + /* SSE keepalive emitted at most every 15s while decode is running. + * Reasoning-only stretches (`...`) and slow tool-input phases + * produce no streamable bytes; without these comment lines the client + * TCP idle-timeout closes the connection. */ + double decode_last_keepalive = decode_t0; thinking_state thinking = thinking_state_from_prompt(&j->req); const bool thinking_gates_tool_markers = ds4_think_mode_enabled(j->req.think_mode); bool tool_scan_waiting_for_think_close = @@ -10249,6 +10280,31 @@ static void generate_job(server *s, job *j) { while (!g_stop_requested && completion < max_tokens && ds4_session_pos(s->session) < ds4_session_ctx(s->session)) { + /* Watchdog-requested abort: the GPU/Metal kernel underneath us has + * been silent past `worker_stuck_soft_s`. Bail before issuing the + * next sample call — the next call would just stall too. */ + if (__atomic_load_n(&s->worker_abort_requested, __ATOMIC_RELAXED)) { + server_log(DS4_LOG_DEFAULT, + "ds4-server: decode loop aborting on watchdog request after %d tokens", + completion); + finish = "error"; + snprintf(err, sizeof(err), "worker aborted by watchdog (likely GPU/Metal stall)"); + break; + } + /* SSE keepalive — best-effort, identical contract to the prefill + * keepalive at server_progress_cb above. */ + if (j->req.stream) { + double now_kp = now_sec(); + if (now_kp - decode_last_keepalive >= 15.0) { + static const char ka[] = ": decode\n\n"; + if (!send_all(j->fd, ka, sizeof(ka) - 1)) { + finish = "error"; + snprintf(err, sizeof(err), "client stream write failed during decode heartbeat"); + break; + } + decode_last_keepalive = now_kp; + } + } dsml_decode_state dsml_state = j->req.kind == REQ_CHAT && j->req.has_tools ? dsml_tracker.decode : DSML_DECODE_OUTSIDE; const bool in_tool_call = dsml_decode_state_is_tool(dsml_state); @@ -10313,6 +10369,9 @@ static void generate_job(server *s, job *j) { size_t piece_len = 0; char *piece = ds4_token_text(s->engine, token, &piece_len); completion++; + /* Tell the watchdog the worker is alive. Relaxed atomic store; + * the watchdog only reads it on a separate 5s polling cadence. */ + __atomic_store_n(&s->worker_last_progress, (long)time(NULL), __ATOMIC_RELAXED); trace_piece(s, trace_id, piece, piece_len); buf_append(&text, piece, piece_len); @@ -10898,10 +10957,19 @@ static job *dequeue(server *s) { static void *worker_main(void *arg) { server *s = arg; + s->worker_tid = pthread_self(); for (;;) { job *j = dequeue(s); if (!j) break; + /* Mark the worker as actively processing a job and seed the + * progress timestamp. The decode loop updates the timestamp on + * every produced token; the watchdog thread polls both fields. */ + __atomic_store_n(&s->worker_last_progress, (long)time(NULL), __ATOMIC_RELAXED); + __atomic_store_n(&s->worker_abort_requested, 0, __ATOMIC_RELAXED); + __atomic_store_n(&s->worker_in_job, 1, __ATOMIC_RELAXED); generate_job(s, j); + __atomic_store_n(&s->worker_in_job, 0, __ATOMIC_RELAXED); + __atomic_store_n(&s->worker_last_progress, 0, __ATOMIC_RELAXED); pthread_mutex_lock(&j->mu); j->done = true; pthread_cond_signal(&j->cv); @@ -10910,6 +10978,53 @@ static void *worker_main(void *arg) { return NULL; } +/* Watchdog thread: detects when the worker has been silent for too long, + * which in practice means the underlying GPU/Metal kernel call wedged. + * + * soft stall (worker_stuck_soft_s, default 60s): + * Set `worker_abort_requested` so the decode loop will bail on its + * next iteration. This is the gentle path — the in-flight HTTP + * client gets an `error` finish_reason and the worker continues + * picking up future jobs. + * hard stall (worker_stuck_hard_s, default 120s): + * Call `_exit(137)`. The process-level supervisor (launchd + * `KeepAlive=true`) immediately restarts us. All in-flight clients + * lose their connection, but the on-disk KV cache survives so the + * restarted process can usually resume their last prompt prefix + * from cache instead of re-prefilling from token zero. + * + * The watchdog ONLY acts while `worker_in_job` is set — idle workers do + * not advance `worker_last_progress`, and we must not interpret that as + * a stall. Setup-time misconfigurations (model load failure, port + * conflict) are not covered by this watchdog; they fail synchronously + * before any job is ever dequeued. */ +static void *watchdog_main(void *arg) { + server *s = arg; + while (!g_stop_requested) { + sleep(5); + if (g_stop_requested) break; + if (!__atomic_load_n(&s->worker_in_job, __ATOMIC_RELAXED)) continue; + time_t last = (time_t)__atomic_load_n(&s->worker_last_progress, __ATOMIC_RELAXED); + if (last == 0) continue; + time_t now = time(NULL); + long stale = (long)(now - last); + if (stale >= s->watchdog_stuck_hard_s) { + server_log(DS4_LOG_DEFAULT, + "ds4-server: WATCHDOG hard stall %lds >= %ds — exiting with 137 so launchd can restart", + stale, s->watchdog_stuck_hard_s); + fflush(NULL); + _exit(137); + } else if (stale >= s->watchdog_stuck_soft_s && + !__atomic_load_n(&s->worker_abort_requested, __ATOMIC_RELAXED)) { + server_log(DS4_LOG_DEFAULT, + "ds4-server: WATCHDOG soft stall %lds >= %ds — requesting decode-loop abort", + stale, s->watchdog_stuck_soft_s); + __atomic_store_n(&s->worker_abort_requested, 1, __ATOMIC_RELAXED); + } + } + return NULL; +} + typedef struct { char method[8]; char path[256]; @@ -11572,9 +11687,26 @@ int main(int argc, char **argv) { server_log(DS4_LOG_DEFAULT, "ds4-server: tracing session to %s", cfg.trace_path); } + /* Watchdog defaults: soft 60s (request graceful decode abort), hard + * 120s (process exit so launchd KeepAlive can restart us cleanly). */ + if (s.watchdog_stuck_soft_s == 0) s.watchdog_stuck_soft_s = 60; + if (s.watchdog_stuck_hard_s == 0) s.watchdog_stuck_hard_s = 120; + pthread_t worker; if (pthread_create(&worker, NULL, worker_main, &s) != 0) die("failed to start worker"); + pthread_t watchdog; + if (pthread_create(&watchdog, NULL, watchdog_main, &s) != 0) { + server_log(DS4_LOG_DEFAULT, + "ds4-server: WARNING failed to start watchdog thread, continuing without stall protection"); + } else { + s.watchdog_tid = watchdog; + s.watchdog_running = true; + server_log(DS4_LOG_DEFAULT, + "ds4-server: watchdog active (soft=%ds hard=%ds)", + s.watchdog_stuck_soft_s, s.watchdog_stuck_hard_s); + } + int lfd = listen_on(cfg.host, cfg.port); if (lfd < 0) { server_log(DS4_LOG_DEFAULT, "ds4-server: failed to listen on %s:%d: %s", cfg.host, cfg.port, strerror(errno)); @@ -11583,6 +11715,7 @@ int main(int argc, char **argv) { pthread_cond_broadcast(&s.cv); pthread_mutex_unlock(&s.mu); pthread_join(worker, NULL); + if (s.watchdog_running) pthread_join(s.watchdog_tid, NULL); server_close_resources(&s); return 1; } @@ -11632,6 +11765,7 @@ int main(int argc, char **argv) { pthread_cond_broadcast(&s.cv); pthread_mutex_unlock(&s.mu); pthread_join(worker, NULL); + if (s.watchdog_running) pthread_join(s.watchdog_tid, NULL); pthread_mutex_lock(&s.mu); while (s.clients > 0) pthread_cond_wait(&s.clients_cv, &s.mu); pthread_mutex_unlock(&s.mu);