Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions crates/taskito-core/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,73 @@ mod tests {
assert_eq!(pending.len(), 1);
}

#[test]
fn test_try_dispatch_reschedules_on_closed_channel() {
// Regression: when the worker channel is closed (worker pool
// shutting down) the job has already been claimed in storage —
// dropping it without rolling back leaves it in `Running` until
// the stale-reaper times it out, which surfaces as a *timeout*
// in metrics. The poller must clear the claim and reset the job
// to `Pending` so the next dispatch attempt picks it up cleanly.
let scheduler = test_scheduler();
let job = scheduler
.storage
.enqueue(make_job("ch_closed_task"))
.unwrap();

let (tx, rx) = make_channel(16);
drop(rx);

let mut counters = TickCounters::default();
scheduler.tick(&tx, &mut counters);

let after = scheduler.storage.get_job(&job.id).unwrap().unwrap();
assert_eq!(
after.status,
JobStatus::Pending,
"job must be returned to Pending when dispatch fails"
);
assert!(after.scheduled_at > now_millis());

let claims = scheduler
.storage
.list_claims_by_worker("scheduler")
.unwrap();
assert!(
!claims.contains(&job.id),
"execution claim must be cleared on dispatch failure"
);
}

#[test]
fn test_try_dispatch_reschedules_on_full_channel() {
// Same regression when the channel is *full* rather than closed —
// the worker pool is behind but still alive. The job must come back
// to Pending so the next tick has a chance to dispatch it once the
// pool drains.
let scheduler = test_scheduler();
let job = scheduler.storage.enqueue(make_job("ch_full_task")).unwrap();

// Capacity-1 channel pre-filled with a sentinel job. The poller's
// `try_send` will see `TrySendError::Full`.
let (tx, _rx) = make_channel(1);
let sentinel = scheduler.storage.enqueue(make_job("sentinel")).unwrap();
tx.try_send(sentinel).expect("pre-fill should succeed");

let mut counters = TickCounters::default();
scheduler.tick(&tx, &mut counters);

let after = scheduler.storage.get_job(&job.id).unwrap().unwrap();
assert_eq!(after.status, JobStatus::Pending);
assert!(after.scheduled_at > now_millis());

let claims = scheduler
.storage
.list_claims_by_worker("scheduler")
.unwrap();
assert!(!claims.contains(&job.id));
}

#[test]
fn test_reap_stale_jobs() {
let mut scheduler = test_scheduler();
Expand Down
29 changes: 25 additions & 4 deletions crates/taskito-core/src/scheduler/poller.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;

use log::warn;
use tokio::sync::mpsc::error::TrySendError;

use crate::error::Result;
use crate::job::{now_millis, Job};
Expand All @@ -17,6 +18,11 @@ const RATE_LIMIT_RETRY_DELAY_MS: i64 = 1000;
/// Delay before re-scheduling a concurrency-limited job (ms).
const CONCURRENCY_RETRY_DELAY_MS: i64 = 500;

/// Delay before re-scheduling a job whose dispatch was rejected because the
/// worker channel was full or closed. Short enough that the worker pool
/// catches up on the next tick rather than waiting for the stale-job reaper.
const CHANNEL_BACKPRESSURE_RETRY_DELAY_MS: i64 = 100;

/// Worker identifier recorded on execution claims taken by the scheduler.
const SCHEDULER_CLAIM_OWNER: &str = "scheduler";

Expand Down Expand Up @@ -65,11 +71,26 @@ impl Scheduler {
return Ok(false);
}

if job_tx.try_send(job).is_err() {
warn!("worker channel full or closed");
// Hand the job to the worker pool. If the channel is unavailable we
// must reverse the claim — otherwise the job sits in `Running` until
// the stale-reaper times it out, which surfaces as a *timeout* in
// metrics and middleware (wrong outcome for a job that never ran).
let job_id = job.id.clone();
match job_tx.try_send(job) {
Ok(()) => Ok(true),
Err(TrySendError::Full(_)) => {
warn!("worker channel full; rescheduling job {job_id} (worker pool is behind)",);
self.rollback_claim_and_retry(&job_id, now + CHANNEL_BACKPRESSURE_RETRY_DELAY_MS)?;
Ok(false)
}
Err(TrySendError::Closed(_)) => {
warn!(
"worker channel closed; rescheduling job {job_id} (worker pool shutting down)",
);
self.rollback_claim_and_retry(&job_id, now + CHANNEL_BACKPRESSURE_RETRY_DELAY_MS)?;
Ok(false)
}
}

Ok(true)
}

/// Snapshot the queue list with paused queues filtered out. The paused
Expand Down
Loading