diff --git a/crates/taskito-core/src/scheduler/mod.rs b/crates/taskito-core/src/scheduler/mod.rs index 69009ff..328b71d 100644 --- a/crates/taskito-core/src/scheduler/mod.rs +++ b/crates/taskito-core/src/scheduler/mod.rs @@ -686,6 +686,73 @@ mod tests { assert_eq!(pending.len(), 1); } + #[test] + fn test_try_dispatch_reschedules_on_closed_channel() { + // Regression: when the worker channel is closed (worker pool + // shutting down) the job has already been claimed in storage — + // dropping it without rolling back leaves it in `Running` until + // the stale-reaper times it out, which surfaces as a *timeout* + // in metrics. The poller must clear the claim and reset the job + // to `Pending` so the next dispatch attempt picks it up cleanly. + let scheduler = test_scheduler(); + let job = scheduler + .storage + .enqueue(make_job("ch_closed_task")) + .unwrap(); + + let (tx, rx) = make_channel(16); + drop(rx); + + let mut counters = TickCounters::default(); + scheduler.tick(&tx, &mut counters); + + let after = scheduler.storage.get_job(&job.id).unwrap().unwrap(); + assert_eq!( + after.status, + JobStatus::Pending, + "job must be returned to Pending when dispatch fails" + ); + assert!(after.scheduled_at > now_millis()); + + let claims = scheduler + .storage + .list_claims_by_worker("scheduler") + .unwrap(); + assert!( + !claims.contains(&job.id), + "execution claim must be cleared on dispatch failure" + ); + } + + #[test] + fn test_try_dispatch_reschedules_on_full_channel() { + // Same regression when the channel is *full* rather than closed — + // the worker pool is behind but still alive. The job must come back + // to Pending so the next tick has a chance to dispatch it once the + // pool drains. + let scheduler = test_scheduler(); + let job = scheduler.storage.enqueue(make_job("ch_full_task")).unwrap(); + + // Capacity-1 channel pre-filled with a sentinel job. The poller's + // `try_send` will see `TrySendError::Full`. + let (tx, _rx) = make_channel(1); + let sentinel = scheduler.storage.enqueue(make_job("sentinel")).unwrap(); + tx.try_send(sentinel).expect("pre-fill should succeed"); + + let mut counters = TickCounters::default(); + scheduler.tick(&tx, &mut counters); + + let after = scheduler.storage.get_job(&job.id).unwrap().unwrap(); + assert_eq!(after.status, JobStatus::Pending); + assert!(after.scheduled_at > now_millis()); + + let claims = scheduler + .storage + .list_claims_by_worker("scheduler") + .unwrap(); + assert!(!claims.contains(&job.id)); + } + #[test] fn test_reap_stale_jobs() { let mut scheduler = test_scheduler(); diff --git a/crates/taskito-core/src/scheduler/poller.rs b/crates/taskito-core/src/scheduler/poller.rs index 90bcbf0..666a6c1 100644 --- a/crates/taskito-core/src/scheduler/poller.rs +++ b/crates/taskito-core/src/scheduler/poller.rs @@ -1,6 +1,7 @@ use std::time::Duration; use log::warn; +use tokio::sync::mpsc::error::TrySendError; use crate::error::Result; use crate::job::{now_millis, Job}; @@ -17,6 +18,11 @@ const RATE_LIMIT_RETRY_DELAY_MS: i64 = 1000; /// Delay before re-scheduling a concurrency-limited job (ms). const CONCURRENCY_RETRY_DELAY_MS: i64 = 500; +/// Delay before re-scheduling a job whose dispatch was rejected because the +/// worker channel was full or closed. Short enough that the worker pool +/// catches up on the next tick rather than waiting for the stale-job reaper. +const CHANNEL_BACKPRESSURE_RETRY_DELAY_MS: i64 = 100; + /// Worker identifier recorded on execution claims taken by the scheduler. const SCHEDULER_CLAIM_OWNER: &str = "scheduler"; @@ -65,11 +71,26 @@ impl Scheduler { return Ok(false); } - if job_tx.try_send(job).is_err() { - warn!("worker channel full or closed"); + // Hand the job to the worker pool. If the channel is unavailable we + // must reverse the claim — otherwise the job sits in `Running` until + // the stale-reaper times it out, which surfaces as a *timeout* in + // metrics and middleware (wrong outcome for a job that never ran). + let job_id = job.id.clone(); + match job_tx.try_send(job) { + Ok(()) => Ok(true), + Err(TrySendError::Full(_)) => { + warn!("worker channel full; rescheduling job {job_id} (worker pool is behind)",); + self.rollback_claim_and_retry(&job_id, now + CHANNEL_BACKPRESSURE_RETRY_DELAY_MS)?; + Ok(false) + } + Err(TrySendError::Closed(_)) => { + warn!( + "worker channel closed; rescheduling job {job_id} (worker pool shutting down)", + ); + self.rollback_claim_and_retry(&job_id, now + CHANNEL_BACKPRESSURE_RETRY_DELAY_MS)?; + Ok(false) + } } - - Ok(true) } /// Snapshot the queue list with paused queues filtered out. The paused