From 24a42cb5399f0ea9361e1c833561c5afdd683492 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sat, 2 May 2026 11:40:19 +0530 Subject: [PATCH] fix(scheduler): reschedule job when worker channel is full or closed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `try_dispatch` was logging the channel-send failure and returning `Ok(true)` as if the job had been dispatched. The job had already been moved to `Running` and its execution-claim row written, so it sat in that state until the stale-job reaper timed it out — at which point it was reported to middleware and metrics as a *timeout failure*, the wrong outcome for a job that never executed. Distinguish the two failure modes (channel full vs closed) with separate warnings so operators can tell backpressure from shutdown, and route both through `rollback_claim_and_retry` to clear the claim and reset status to `Pending` with a 100ms delay. The next tick will dispatch normally once the worker pool drains or restarts. Two regression tests cover the closed-channel and full-channel paths. --- crates/taskito-core/src/scheduler/mod.rs | 67 +++++++++++++++++++++ crates/taskito-core/src/scheduler/poller.rs | 29 +++++++-- 2 files changed, 92 insertions(+), 4 deletions(-) diff --git a/crates/taskito-core/src/scheduler/mod.rs b/crates/taskito-core/src/scheduler/mod.rs index 69009ff..328b71d 100644 --- a/crates/taskito-core/src/scheduler/mod.rs +++ b/crates/taskito-core/src/scheduler/mod.rs @@ -686,6 +686,73 @@ mod tests { assert_eq!(pending.len(), 1); } + #[test] + fn test_try_dispatch_reschedules_on_closed_channel() { + // Regression: when the worker channel is closed (worker pool + // shutting down) the job has already been claimed in storage — + // dropping it without rolling back leaves it in `Running` until + // the stale-reaper times it out, which surfaces as a *timeout* + // in metrics. The poller must clear the claim and reset the job + // to `Pending` so the next dispatch attempt picks it up cleanly. + let scheduler = test_scheduler(); + let job = scheduler + .storage + .enqueue(make_job("ch_closed_task")) + .unwrap(); + + let (tx, rx) = make_channel(16); + drop(rx); + + let mut counters = TickCounters::default(); + scheduler.tick(&tx, &mut counters); + + let after = scheduler.storage.get_job(&job.id).unwrap().unwrap(); + assert_eq!( + after.status, + JobStatus::Pending, + "job must be returned to Pending when dispatch fails" + ); + assert!(after.scheduled_at > now_millis()); + + let claims = scheduler + .storage + .list_claims_by_worker("scheduler") + .unwrap(); + assert!( + !claims.contains(&job.id), + "execution claim must be cleared on dispatch failure" + ); + } + + #[test] + fn test_try_dispatch_reschedules_on_full_channel() { + // Same regression when the channel is *full* rather than closed — + // the worker pool is behind but still alive. The job must come back + // to Pending so the next tick has a chance to dispatch it once the + // pool drains. + let scheduler = test_scheduler(); + let job = scheduler.storage.enqueue(make_job("ch_full_task")).unwrap(); + + // Capacity-1 channel pre-filled with a sentinel job. The poller's + // `try_send` will see `TrySendError::Full`. + let (tx, _rx) = make_channel(1); + let sentinel = scheduler.storage.enqueue(make_job("sentinel")).unwrap(); + tx.try_send(sentinel).expect("pre-fill should succeed"); + + let mut counters = TickCounters::default(); + scheduler.tick(&tx, &mut counters); + + let after = scheduler.storage.get_job(&job.id).unwrap().unwrap(); + assert_eq!(after.status, JobStatus::Pending); + assert!(after.scheduled_at > now_millis()); + + let claims = scheduler + .storage + .list_claims_by_worker("scheduler") + .unwrap(); + assert!(!claims.contains(&job.id)); + } + #[test] fn test_reap_stale_jobs() { let mut scheduler = test_scheduler(); diff --git a/crates/taskito-core/src/scheduler/poller.rs b/crates/taskito-core/src/scheduler/poller.rs index 90bcbf0..666a6c1 100644 --- a/crates/taskito-core/src/scheduler/poller.rs +++ b/crates/taskito-core/src/scheduler/poller.rs @@ -1,6 +1,7 @@ use std::time::Duration; use log::warn; +use tokio::sync::mpsc::error::TrySendError; use crate::error::Result; use crate::job::{now_millis, Job}; @@ -17,6 +18,11 @@ const RATE_LIMIT_RETRY_DELAY_MS: i64 = 1000; /// Delay before re-scheduling a concurrency-limited job (ms). const CONCURRENCY_RETRY_DELAY_MS: i64 = 500; +/// Delay before re-scheduling a job whose dispatch was rejected because the +/// worker channel was full or closed. Short enough that the worker pool +/// catches up on the next tick rather than waiting for the stale-job reaper. +const CHANNEL_BACKPRESSURE_RETRY_DELAY_MS: i64 = 100; + /// Worker identifier recorded on execution claims taken by the scheduler. const SCHEDULER_CLAIM_OWNER: &str = "scheduler"; @@ -65,11 +71,26 @@ impl Scheduler { return Ok(false); } - if job_tx.try_send(job).is_err() { - warn!("worker channel full or closed"); + // Hand the job to the worker pool. If the channel is unavailable we + // must reverse the claim — otherwise the job sits in `Running` until + // the stale-reaper times it out, which surfaces as a *timeout* in + // metrics and middleware (wrong outcome for a job that never ran). + let job_id = job.id.clone(); + match job_tx.try_send(job) { + Ok(()) => Ok(true), + Err(TrySendError::Full(_)) => { + warn!("worker channel full; rescheduling job {job_id} (worker pool is behind)",); + self.rollback_claim_and_retry(&job_id, now + CHANNEL_BACKPRESSURE_RETRY_DELAY_MS)?; + Ok(false) + } + Err(TrySendError::Closed(_)) => { + warn!( + "worker channel closed; rescheduling job {job_id} (worker pool shutting down)", + ); + self.rollback_claim_and_retry(&job_id, now + CHANNEL_BACKPRESSURE_RETRY_DELAY_MS)?; + Ok(false) + } } - - Ok(true) } /// Snapshot the queue list with paused queues filtered out. The paused