Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions ds4_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -9215,8 +9215,37 @@ typedef struct {
bool headers_sent;
bool stream_failed;
double last_keepalive;
pthread_t keepalive_thread;
volatile int keepalive_running;
} server_prefill_progress;

/* Background thread: sends SSE keepalive comments on a wall-clock timer
* so the connection survives prefill stalls that don't trigger the
* progress callback (which only fires on chunk boundaries). */
static void *keepalive_worker(void *arg) {
server_prefill_progress *p = (server_prefill_progress *)arg;
struct timespec ts = {.tv_sec = 5, .tv_nsec = 0};
while (p->keepalive_running) {
nanosleep(&ts, NULL);
if (!p->keepalive_running) break;
if (p->stream && p->fd >= 0 && !p->stream_failed) {
if (!p->headers_sent) {
if (sse_headers(p->fd, p->enable_cors)) {
p->headers_sent = true;
} else {
p->stream_failed = true;
}
} else {
static const char ka[] = ": prefill\n\n";
if (!send_all(p->fd, ka, sizeof(ka) - 1)) {
p->stream_failed = true;
}
}
}
}
return NULL;
}

static void request_ctx_span(char *buf, size_t len, int cached, int prompt) {
int suffix = prompt - cached;
if (suffix < 0) suffix = 0;
Expand Down Expand Up @@ -9640,7 +9669,20 @@ static void canonicalize_tool_checkpoint(server *s, const job *j, const char *ct
};
snprintf(rebuild_progress.ctx, sizeof(rebuild_progress.ctx), "%s", rebuild_ctx);
ds4_session_set_progress(s->session, server_progress_cb, &rebuild_progress);
/* Start keepalive thread for rebuild sync. */
if (rebuild_progress.stream) {
rebuild_progress.keepalive_running = 1;
if (pthread_create(&rebuild_progress.keepalive_thread, NULL,
keepalive_worker, &rebuild_progress) != 0) {
rebuild_progress.keepalive_running = 0;
}
}
if (ds4_session_sync(s->session, sync_prompt, sync_err, sizeof(sync_err)) == 0) {
/* Stop keepalive thread. */
if (rebuild_progress.keepalive_running) {
rebuild_progress.keepalive_running = 0;
pthread_join(rebuild_progress.keepalive_thread, NULL);
}
ds4_session_set_progress(s->session, NULL, NULL);
const double rebuild_sec = now_sec() - rebuild_t0;
if (loaded > 0) {
Expand All @@ -9659,6 +9701,11 @@ static void canonicalize_tool_checkpoint(server *s, const job *j, const char *ct
common, live_len, canonical.len, err);
}
} else {
/* Stop keepalive thread on rebuild failure. */
if (rebuild_progress.keepalive_running) {
rebuild_progress.keepalive_running = 0;
pthread_join(rebuild_progress.keepalive_thread, NULL);
}
ds4_session_set_progress(s->session, NULL, NULL);
server_log(DS4_LOG_KVCACHE,
"ds4-server: tool checkpoint rebuild failed ctx=%s request_ctx=%s source=%s cached=%d replay=%d target=%d error=\"%s\"",
Expand Down Expand Up @@ -9908,6 +9955,16 @@ static void generate_job(server *s, job *j) {
req_flags);
ds4_session_set_progress(s->session, server_progress_cb, &progress);

/* Start background keepalive thread so SSE connections survive
* prefill stalls that don't trigger the progress callback. */
if (progress.stream) {
progress.keepalive_running = 1;
if (pthread_create(&progress.keepalive_thread, NULL,
keepalive_worker, &progress) != 0) {
progress.keepalive_running = 0;
}
}

int cold_store_len = 0;
if (cached == 0 &&
s->kv.enabled &&
Expand Down Expand Up @@ -9942,6 +9999,11 @@ static void generate_job(server *s, job *j) {
if (ds4_session_sync(s->session, &prefix, err, sizeof(err)) != 0) {
ds4_tokens_free(&prefix);
ds4_tokens_free(&effective_prompt);
/* Stop keepalive thread. */
if (progress.keepalive_running) {
progress.keepalive_running = 0;
pthread_join(progress.keepalive_thread, NULL);
}
ds4_session_set_progress(s->session, NULL, NULL);
kv_cache_restore_suppressed_continued(&s->kv, suppressed_continued_last,
cold_store_len);
Expand All @@ -9962,6 +10024,11 @@ static void generate_job(server *s, job *j) {

if (ds4_session_sync(s->session, prompt_for_sync, err, sizeof(err)) != 0) {
ds4_tokens_free(&effective_prompt);
/* Stop keepalive thread before clearing progress callback. */
if (progress.keepalive_running) {
progress.keepalive_running = 0;
pthread_join(progress.keepalive_thread, NULL);
}
ds4_session_set_progress(s->session, NULL, NULL);
kv_cache_restore_suppressed_continued(&s->kv, suppressed_continued_last,
cold_store_len);
Expand All @@ -9974,6 +10041,11 @@ static void generate_job(server *s, job *j) {
if (!responses_live_continuation) responses_live_clear(s);
if (!anthropic_live_continuation) anthropic_live_clear(s);
if (!thinking_live_continuation) thinking_live_clear(s);
/* Stop keepalive thread before clearing progress callback. */
if (progress.keepalive_running) {
progress.keepalive_running = 0;
pthread_join(progress.keepalive_thread, NULL);
}
ds4_session_set_progress(s->session, NULL, NULL);
kv_cache_maybe_store_continued(s);
server_log(DS4_LOG_PREFILL,
Expand Down