From bb2221a538b2764cdc2bed9d372a26cd39309591 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sat, 2 May 2026 11:07:18 +0530 Subject: [PATCH] fix(scheduler): make concurrency cap atomic with claim_execution Two distinct bugs in the per-task and per-queue concurrency gates: 1. The cap check ran *before* `claim_execution`, so two scheduler instances could read a count below the cap and both proceed past the gate before either recorded the new running job. Postgres (`SELECT FOR UPDATE SKIP LOCKED`) and Redis are fully concurrent and would hit this; SQLite was protected only by transaction serialization. 2. `dequeue_from` already transitions status to `Running` before the gate runs, so the running-count includes the just-dequeued job. The `>=` comparison meant `max_concurrent = N` actually allowed only N-1 jobs; `max_concurrent = 1` allowed zero. Move the cap check to *after* `claim_execution` succeeds, change `>=` to `>`, and add a rollback path that clears the claim row + retries the job when the post-claim gate rejects it. Split `try_dispatch` into named helpers so each step of the dispatch flow is self-documenting. Three regression tests cover the off-by-one and the rollback path. --- crates/taskito-core/src/scheduler/mod.rs | 125 ++++++++++++++ crates/taskito-core/src/scheduler/poller.rs | 173 +++++++++++++------- 2 files changed, 241 insertions(+), 57 deletions(-) diff --git a/crates/taskito-core/src/scheduler/mod.rs b/crates/taskito-core/src/scheduler/mod.rs index a91468f..69009ff 100644 --- a/crates/taskito-core/src/scheduler/mod.rs +++ b/crates/taskito-core/src/scheduler/mod.rs @@ -561,6 +561,131 @@ mod tests { assert!(jobs[0].scheduled_at > now_millis()); } + #[test] + fn test_try_dispatch_per_task_concurrency_allows_exactly_max() { + // Regression: with `max_concurrent = 2` we expect exactly 2 jobs to + // be dispatched, not `max - 1`. The `dequeue_from` step transitions + // status to `Running` before the cap check, so the count includes + // the just-dequeued job — the gate must use `>` rather than `>=`. + let mut scheduler = test_scheduler(); + scheduler.register_task( + "conc_task".to_string(), + TaskConfig { + retry_policy: RetryPolicy::default(), + rate_limit: None, + circuit_breaker: None, + max_concurrent: Some(2), + }, + ); + + for _ in 0..3 { + scheduler.storage.enqueue(make_job("conc_task")).unwrap(); + } + + let (tx, mut rx) = make_channel(16); + let mut counters = TickCounters::default(); + + scheduler.tick(&tx, &mut counters); + scheduler.tick(&tx, &mut counters); + scheduler.tick(&tx, &mut counters); + + let mut dispatched = 0; + while rx.try_recv().is_ok() { + dispatched += 1; + } + assert_eq!( + dispatched, 2, + "expected exactly max_concurrent jobs dispatched" + ); + + // The rejected job should be back to Pending with a future schedule + // and its execution-claim row cleared. + let pending = scheduler + .storage + .list_jobs(Some(JobStatus::Pending as i32), None, None, 10, 0, None) + .unwrap(); + assert_eq!(pending.len(), 1); + assert!(pending[0].scheduled_at > now_millis()); + + let claims = scheduler + .storage + .list_claims_by_worker("scheduler") + .unwrap(); + assert_eq!( + claims.len(), + 2, + "rejected job's claim row should have been rolled back" + ); + assert!( + !claims.contains(&pending[0].id), + "rejected job should not have a stale execution claim" + ); + } + + #[test] + fn test_try_dispatch_per_task_max_one_dispatches_one() { + // Regression: `max_concurrent = 1` must allow exactly one job to + // run. With the pre-fix `>=` check the running-count (which already + // includes the just-dequeued job) tripped the gate and the job was + // rescheduled — effectively `max_concurrent = 0`. + let mut scheduler = test_scheduler(); + scheduler.register_task( + "single_task".to_string(), + TaskConfig { + retry_policy: RetryPolicy::default(), + rate_limit: None, + circuit_breaker: None, + max_concurrent: Some(1), + }, + ); + + scheduler.storage.enqueue(make_job("single_task")).unwrap(); + + let (tx, mut rx) = make_channel(16); + let mut counters = TickCounters::default(); + scheduler.tick(&tx, &mut counters); + + assert!( + rx.try_recv().is_ok(), + "the single allowed concurrent job must dispatch" + ); + } + + #[test] + fn test_try_dispatch_per_queue_concurrency_allows_exactly_max() { + // Same regression for the queue-level cap. + let mut scheduler = test_scheduler(); + scheduler.register_queue_config( + "default".to_string(), + QueueConfig { + rate_limit: None, + max_concurrent: Some(2), + }, + ); + + for _ in 0..3 { + scheduler.storage.enqueue(make_job("queue_capped")).unwrap(); + } + + let (tx, mut rx) = make_channel(16); + let mut counters = TickCounters::default(); + scheduler.tick(&tx, &mut counters); + scheduler.tick(&tx, &mut counters); + scheduler.tick(&tx, &mut counters); + + let mut dispatched = 0; + while rx.try_recv().is_ok() { + dispatched += 1; + } + assert_eq!(dispatched, 2); + + let pending = scheduler + .storage + .list_jobs(Some(JobStatus::Pending as i32), None, None, 10, 0, None) + .unwrap(); + assert_eq!(pending.len(), 1); + } + #[test] fn test_reap_stale_jobs() { let mut scheduler = test_scheduler(); diff --git a/crates/taskito-core/src/scheduler/poller.rs b/crates/taskito-core/src/scheduler/poller.rs index aa48e7c..90bcbf0 100644 --- a/crates/taskito-core/src/scheduler/poller.rs +++ b/crates/taskito-core/src/scheduler/poller.rs @@ -17,36 +17,14 @@ const RATE_LIMIT_RETRY_DELAY_MS: i64 = 1000; /// Delay before re-scheduling a concurrency-limited job (ms). const CONCURRENCY_RETRY_DELAY_MS: i64 = 500; +/// Worker identifier recorded on execution claims taken by the scheduler. +const SCHEDULER_CLAIM_OWNER: &str = "scheduler"; + impl Scheduler { pub(super) fn try_dispatch(&self, job_tx: &tokio::sync::mpsc::Sender) -> Result { let now = now_millis(); - // Filter out paused queues (refresh cache every 1s) - let active_queues = { - let mut cache = self.paused_cache.lock().unwrap_or_else(|poisoned| { - warn!("paused_cache mutex was poisoned, recovering"); - poisoned.into_inner() - }); - if cache.1.elapsed() > Duration::from_secs(1) { - cache.0 = self - .storage - .list_paused_queues() - .unwrap_or_default() - .into_iter() - .collect(); - cache.1 = std::time::Instant::now(); - } - if cache.0.is_empty() { - self.queues.clone() - } else { - self.queues - .iter() - .filter(|q| !cache.0.contains(*q)) - .cloned() - .collect::>() - } - }; - + let active_queues = self.active_queues(); if active_queues.is_empty() { return Ok(false); } @@ -59,7 +37,72 @@ impl Scheduler { None => return Ok(false), }; - // Check queue-level limits + // Pre-claim soft gates: rate limits and circuit breaker. + // + // These don't need to be atomic with the claim — if two schedulers + // both pass these gates, the gate semantics still hold (each + // consumes its own token / observes the breaker independently). + if !self.check_pre_claim_gates(&job, now)? { + return Ok(false); + } + + // Claim exactly-once execution. After this point, the job is reserved + // for this scheduler instance. + if !self.claim_for_dispatch(&job)? { + return Ok(false); + } + + // Post-claim hard gate: concurrency caps must be checked AFTER the + // claim so two schedulers cannot both pass the cap. Status was + // already transitioned to `Running` by `dequeue_from`, so the + // running-count includes this job — use strict `>` to allow exactly + // `max_concurrent` jobs. + // + // If we exceed the cap, roll back: clear the claim row and reset + // status to `Pending` so the job can be dispatched again later. + if !self.check_post_claim_concurrency(&job)? { + self.rollback_claim_and_retry(&job.id, now + CONCURRENCY_RETRY_DELAY_MS)?; + return Ok(false); + } + + if job_tx.try_send(job).is_err() { + warn!("worker channel full or closed"); + } + + Ok(true) + } + + /// Snapshot the queue list with paused queues filtered out. The paused + /// list is cached for 1s to avoid hammering storage on every tick. + fn active_queues(&self) -> Vec { + let mut cache = self.paused_cache.lock().unwrap_or_else(|poisoned| { + warn!("paused_cache mutex was poisoned, recovering"); + poisoned.into_inner() + }); + if cache.1.elapsed() > Duration::from_secs(1) { + cache.0 = self + .storage + .list_paused_queues() + .unwrap_or_default() + .into_iter() + .collect(); + cache.1 = std::time::Instant::now(); + } + if cache.0.is_empty() { + self.queues.clone() + } else { + self.queues + .iter() + .filter(|q| !cache.0.contains(*q)) + .cloned() + .collect() + } + } + + /// Apply the pre-claim soft gates (queue rate limit, task circuit + /// breaker, task rate limit). Returns `Ok(true)` if the job may proceed + /// to claim, `Ok(false)` if it was rescheduled. + fn check_pre_claim_gates(&self, job: &Job, now: i64) -> Result { if let Some(qcfg) = self.queue_configs.get(&job.queue) { if let Some(ref rl_config) = qcfg.rate_limit { let key = format!("queue:{}", job.queue); @@ -69,17 +112,8 @@ impl Scheduler { return Ok(false); } } - if let Some(max_conc) = qcfg.max_concurrent { - let stats = self.storage.stats_by_queue(&job.queue)?; - if stats.running >= max_conc as i64 { - self.storage - .retry(&job.id, now + CONCURRENCY_RETRY_DELAY_MS)?; - return Ok(false); - } - } } - // Check circuit breaker for this task if let Some(config) = self.task_configs.get(&job.task_name) { if config.circuit_breaker.is_some() && !self.circuit_breaker.allow(&job.task_name)? { self.storage @@ -94,36 +128,61 @@ impl Scheduler { return Ok(false); } } - - // Check per-task concurrency limit - if let Some(max_conc) = config.max_concurrent { - let running = self.storage.count_running_by_task(&job.task_name)?; - if running >= max_conc as i64 { - self.storage - .retry(&job.id, now + CONCURRENCY_RETRY_DELAY_MS)?; - return Ok(false); - } - } } - // Claim exactly-once execution - match self.storage.claim_execution(&job.id, "scheduler") { - Ok(false) => { - // Already claimed by another worker — skip - return Ok(false); - } - Ok(true) => {} + Ok(true) + } + + /// Try to claim exactly-once execution. Returns `Ok(true)` if the claim + /// was taken (or recoverably failed and the caller should still attempt + /// dispatch), `Ok(false)` if the job was already claimed by another + /// scheduler. + fn claim_for_dispatch(&self, job: &Job) -> Result { + match self.storage.claim_execution(&job.id, SCHEDULER_CLAIM_OWNER) { + Ok(true) => Ok(true), + Ok(false) => Ok(false), Err(e) => { + // Don't drop the job on a transient claim error — proceed and + // let the worker handle the duplicate execution defensively. warn!("claim_execution error for job {}: {e}", job.id); - // Proceed anyway to avoid dropping the job + Ok(true) } } + } - // Dispatch to worker pool (non-blocking) - if job_tx.try_send(job).is_err() { - warn!("worker channel full or closed"); + /// Apply the post-claim hard gates (per-queue and per-task concurrency + /// caps). Returns `Ok(true)` if the job may proceed to dispatch, + /// `Ok(false)` if the cap is exceeded — caller is responsible for + /// rolling back the claim. + fn check_post_claim_concurrency(&self, job: &Job) -> Result { + if let Some(qcfg) = self.queue_configs.get(&job.queue) { + if let Some(max_conc) = qcfg.max_concurrent { + let stats = self.storage.stats_by_queue(&job.queue)?; + if stats.running > max_conc as i64 { + return Ok(false); + } + } + } + + if let Some(config) = self.task_configs.get(&job.task_name) { + if let Some(max_conc) = config.max_concurrent { + let running = self.storage.count_running_by_task(&job.task_name)?; + if running > max_conc as i64 { + return Ok(false); + } + } } Ok(true) } + + /// Reverse a successful `claim_execution` and reschedule the job. Used + /// when a post-claim gate rejects the job after the claim row has + /// already been written. + fn rollback_claim_and_retry(&self, job_id: &str, next_at: i64) -> Result<()> { + if let Err(e) = self.storage.complete_execution(job_id) { + warn!("failed to clear execution claim during rollback for job {job_id}: {e}"); + } + self.storage.retry(job_id, next_at) + } }