Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions ds4_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -7702,6 +7702,23 @@ struct server {
FILE *trace;
pthread_mutex_t trace_mu;
uint64_t trace_seq;
/* Watchdog: the inference worker can stall in the underlying GPU/Metal
* kernel calls in `ds4_session_eval*` / `ds4_session_sample`. Those
* calls have no cancellation points, so SIGTERM cannot drain a stuck
* turn. A dedicated watchdog thread polls `worker_last_progress` while
* `worker_in_job` is set: a soft stall flips `worker_abort_requested`
* so the decode loop bails on its next iteration, a hard stall calls
* `_exit(137)` so the launchd `KeepAlive` policy can restart us. */
pthread_t worker_tid;
pthread_t watchdog_tid;
bool watchdog_running;
int watchdog_stuck_soft_s;
int watchdog_stuck_hard_s;
/* These three are accessed via __atomic_* across threads and must keep
* relaxed-atomic semantics — do not read/write them with plain `=`. */
volatile long worker_last_progress;
volatile int worker_in_job;
volatile int worker_abort_requested;
};

/* Jobs are stack-owned by the client thread. The worker signals completion
Expand Down Expand Up @@ -9505,6 +9522,15 @@ static void server_progress_cb(void *ud, const char *event, int current, int tot
server_prefill_progress *p = ud;
if (!p || !event || strcmp(event, "prefill_chunk")) return;

/* Prefill chunks are real worker progress — refresh the watchdog
* timestamp so a long prefill (≥ soft threshold for big prompts) is
* not mistaken for a Metal stall. Without this, prompts over
* ~18-20k tokens get the decode loop aborted as soon as prefill
* finishes, because no token was produced for >60s. */
if (p->srv) {
__atomic_store_n(&p->srv->worker_last_progress, (long)time(NULL), __ATOMIC_RELAXED);
}

double now = now_sec();
/* Keep the HTTP/SSE connection alive while prefill runs. We write the SSE
* response headers the first time the callback fires and then emit a
Expand Down Expand Up @@ -10240,6 +10266,11 @@ static void generate_job(server *s, job *j) {
const double decode_t0 = now_sec();
double last_decode_log_t = decode_t0;
int last_decode_log_completion = 0;
/* SSE keepalive emitted at most every 15s while decode is running.
* Reasoning-only stretches (`<think>...`) and slow tool-input phases
* produce no streamable bytes; without these comment lines the client
* TCP idle-timeout closes the connection. */
double decode_last_keepalive = decode_t0;
thinking_state thinking = thinking_state_from_prompt(&j->req);
const bool thinking_gates_tool_markers = ds4_think_mode_enabled(j->req.think_mode);
bool tool_scan_waiting_for_think_close =
Expand All @@ -10249,6 +10280,31 @@ static void generate_job(server *s, job *j) {

while (!g_stop_requested && completion < max_tokens &&
ds4_session_pos(s->session) < ds4_session_ctx(s->session)) {
/* Watchdog-requested abort: the GPU/Metal kernel underneath us has
* been silent past `worker_stuck_soft_s`. Bail before issuing the
* next sample call — the next call would just stall too. */
if (__atomic_load_n(&s->worker_abort_requested, __ATOMIC_RELAXED)) {
server_log(DS4_LOG_DEFAULT,
"ds4-server: decode loop aborting on watchdog request after %d tokens",
completion);
finish = "error";
snprintf(err, sizeof(err), "worker aborted by watchdog (likely GPU/Metal stall)");
break;
}
/* SSE keepalive — best-effort, identical contract to the prefill
* keepalive at server_progress_cb above. */
if (j->req.stream) {
double now_kp = now_sec();
if (now_kp - decode_last_keepalive >= 15.0) {
static const char ka[] = ": decode\n\n";
if (!send_all(j->fd, ka, sizeof(ka) - 1)) {
finish = "error";
snprintf(err, sizeof(err), "client stream write failed during decode heartbeat");
break;
}
decode_last_keepalive = now_kp;
}
}
dsml_decode_state dsml_state = j->req.kind == REQ_CHAT && j->req.has_tools ?
dsml_tracker.decode : DSML_DECODE_OUTSIDE;
const bool in_tool_call = dsml_decode_state_is_tool(dsml_state);
Expand Down Expand Up @@ -10313,6 +10369,9 @@ static void generate_job(server *s, job *j) {
size_t piece_len = 0;
char *piece = ds4_token_text(s->engine, token, &piece_len);
completion++;
/* Tell the watchdog the worker is alive. Relaxed atomic store;
* the watchdog only reads it on a separate 5s polling cadence. */
__atomic_store_n(&s->worker_last_progress, (long)time(NULL), __ATOMIC_RELAXED);

trace_piece(s, trace_id, piece, piece_len);
buf_append(&text, piece, piece_len);
Expand Down Expand Up @@ -10898,10 +10957,19 @@ static job *dequeue(server *s) {

static void *worker_main(void *arg) {
server *s = arg;
s->worker_tid = pthread_self();
for (;;) {
job *j = dequeue(s);
if (!j) break;
/* Mark the worker as actively processing a job and seed the
* progress timestamp. The decode loop updates the timestamp on
* every produced token; the watchdog thread polls both fields. */
__atomic_store_n(&s->worker_last_progress, (long)time(NULL), __ATOMIC_RELAXED);
__atomic_store_n(&s->worker_abort_requested, 0, __ATOMIC_RELAXED);
__atomic_store_n(&s->worker_in_job, 1, __ATOMIC_RELAXED);
generate_job(s, j);
__atomic_store_n(&s->worker_in_job, 0, __ATOMIC_RELAXED);
__atomic_store_n(&s->worker_last_progress, 0, __ATOMIC_RELAXED);
pthread_mutex_lock(&j->mu);
j->done = true;
pthread_cond_signal(&j->cv);
Expand All @@ -10910,6 +10978,53 @@ static void *worker_main(void *arg) {
return NULL;
}

/* Watchdog thread: detects when the worker has been silent for too long,
* which in practice means the underlying GPU/Metal kernel call wedged.
*
* soft stall (worker_stuck_soft_s, default 60s):
* Set `worker_abort_requested` so the decode loop will bail on its
* next iteration. This is the gentle path — the in-flight HTTP
* client gets an `error` finish_reason and the worker continues
* picking up future jobs.
* hard stall (worker_stuck_hard_s, default 120s):
* Call `_exit(137)`. The process-level supervisor (launchd
* `KeepAlive=true`) immediately restarts us. All in-flight clients
* lose their connection, but the on-disk KV cache survives so the
* restarted process can usually resume their last prompt prefix
* from cache instead of re-prefilling from token zero.
*
* The watchdog ONLY acts while `worker_in_job` is set — idle workers do
* not advance `worker_last_progress`, and we must not interpret that as
* a stall. Setup-time misconfigurations (model load failure, port
* conflict) are not covered by this watchdog; they fail synchronously
* before any job is ever dequeued. */
static void *watchdog_main(void *arg) {
server *s = arg;
while (!g_stop_requested) {
sleep(5);
if (g_stop_requested) break;
if (!__atomic_load_n(&s->worker_in_job, __ATOMIC_RELAXED)) continue;
time_t last = (time_t)__atomic_load_n(&s->worker_last_progress, __ATOMIC_RELAXED);
if (last == 0) continue;
time_t now = time(NULL);
long stale = (long)(now - last);
if (stale >= s->watchdog_stuck_hard_s) {
server_log(DS4_LOG_DEFAULT,
"ds4-server: WATCHDOG hard stall %lds >= %ds — exiting with 137 so launchd can restart",
stale, s->watchdog_stuck_hard_s);
fflush(NULL);
_exit(137);
} else if (stale >= s->watchdog_stuck_soft_s &&
!__atomic_load_n(&s->worker_abort_requested, __ATOMIC_RELAXED)) {
server_log(DS4_LOG_DEFAULT,
"ds4-server: WATCHDOG soft stall %lds >= %ds — requesting decode-loop abort",
stale, s->watchdog_stuck_soft_s);
__atomic_store_n(&s->worker_abort_requested, 1, __ATOMIC_RELAXED);
}
}
return NULL;
}

typedef struct {
char method[8];
char path[256];
Expand Down Expand Up @@ -11572,9 +11687,26 @@ int main(int argc, char **argv) {
server_log(DS4_LOG_DEFAULT, "ds4-server: tracing session to %s", cfg.trace_path);
}

/* Watchdog defaults: soft 60s (request graceful decode abort), hard
* 120s (process exit so launchd KeepAlive can restart us cleanly). */
if (s.watchdog_stuck_soft_s == 0) s.watchdog_stuck_soft_s = 60;
if (s.watchdog_stuck_hard_s == 0) s.watchdog_stuck_hard_s = 120;

pthread_t worker;
if (pthread_create(&worker, NULL, worker_main, &s) != 0) die("failed to start worker");

pthread_t watchdog;
if (pthread_create(&watchdog, NULL, watchdog_main, &s) != 0) {
server_log(DS4_LOG_DEFAULT,
"ds4-server: WARNING failed to start watchdog thread, continuing without stall protection");
} else {
s.watchdog_tid = watchdog;
s.watchdog_running = true;
server_log(DS4_LOG_DEFAULT,
"ds4-server: watchdog active (soft=%ds hard=%ds)",
s.watchdog_stuck_soft_s, s.watchdog_stuck_hard_s);
}

int lfd = listen_on(cfg.host, cfg.port);
if (lfd < 0) {
server_log(DS4_LOG_DEFAULT, "ds4-server: failed to listen on %s:%d: %s", cfg.host, cfg.port, strerror(errno));
Expand All @@ -11583,6 +11715,7 @@ int main(int argc, char **argv) {
pthread_cond_broadcast(&s.cv);
pthread_mutex_unlock(&s.mu);
pthread_join(worker, NULL);
if (s.watchdog_running) pthread_join(s.watchdog_tid, NULL);
server_close_resources(&s);
return 1;
}
Expand Down Expand Up @@ -11632,6 +11765,7 @@ int main(int argc, char **argv) {
pthread_cond_broadcast(&s.cv);
pthread_mutex_unlock(&s.mu);
pthread_join(worker, NULL);
if (s.watchdog_running) pthread_join(s.watchdog_tid, NULL);
pthread_mutex_lock(&s.mu);
while (s.clients > 0) pthread_cond_wait(&s.clients_cv, &s.mu);
pthread_mutex_unlock(&s.mu);
Expand Down