Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions crates/taskito-core/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,131 @@ mod tests {
assert!(jobs[0].scheduled_at > now_millis());
}

#[test]
fn test_try_dispatch_per_task_concurrency_allows_exactly_max() {
// Regression: with `max_concurrent = 2` we expect exactly 2 jobs to
// be dispatched, not `max - 1`. The `dequeue_from` step transitions
// status to `Running` before the cap check, so the count includes
// the just-dequeued job — the gate must use `>` rather than `>=`.
let mut scheduler = test_scheduler();
scheduler.register_task(
"conc_task".to_string(),
TaskConfig {
retry_policy: RetryPolicy::default(),
rate_limit: None,
circuit_breaker: None,
max_concurrent: Some(2),
},
);

for _ in 0..3 {
scheduler.storage.enqueue(make_job("conc_task")).unwrap();
}

let (tx, mut rx) = make_channel(16);
let mut counters = TickCounters::default();

scheduler.tick(&tx, &mut counters);
scheduler.tick(&tx, &mut counters);
scheduler.tick(&tx, &mut counters);

let mut dispatched = 0;
while rx.try_recv().is_ok() {
dispatched += 1;
}
assert_eq!(
dispatched, 2,
"expected exactly max_concurrent jobs dispatched"
);

// The rejected job should be back to Pending with a future schedule
// and its execution-claim row cleared.
let pending = scheduler
.storage
.list_jobs(Some(JobStatus::Pending as i32), None, None, 10, 0, None)
.unwrap();
assert_eq!(pending.len(), 1);
assert!(pending[0].scheduled_at > now_millis());

let claims = scheduler
.storage
.list_claims_by_worker("scheduler")
.unwrap();
assert_eq!(
claims.len(),
2,
"rejected job's claim row should have been rolled back"
);
assert!(
!claims.contains(&pending[0].id),
"rejected job should not have a stale execution claim"
);
}

#[test]
fn test_try_dispatch_per_task_max_one_dispatches_one() {
// Regression: `max_concurrent = 1` must allow exactly one job to
// run. With the pre-fix `>=` check the running-count (which already
// includes the just-dequeued job) tripped the gate and the job was
// rescheduled — effectively `max_concurrent = 0`.
let mut scheduler = test_scheduler();
scheduler.register_task(
"single_task".to_string(),
TaskConfig {
retry_policy: RetryPolicy::default(),
rate_limit: None,
circuit_breaker: None,
max_concurrent: Some(1),
},
);

scheduler.storage.enqueue(make_job("single_task")).unwrap();

let (tx, mut rx) = make_channel(16);
let mut counters = TickCounters::default();
scheduler.tick(&tx, &mut counters);

assert!(
rx.try_recv().is_ok(),
"the single allowed concurrent job must dispatch"
);
}

#[test]
fn test_try_dispatch_per_queue_concurrency_allows_exactly_max() {
// Same regression for the queue-level cap.
let mut scheduler = test_scheduler();
scheduler.register_queue_config(
"default".to_string(),
QueueConfig {
rate_limit: None,
max_concurrent: Some(2),
},
);

for _ in 0..3 {
scheduler.storage.enqueue(make_job("queue_capped")).unwrap();
}

let (tx, mut rx) = make_channel(16);
let mut counters = TickCounters::default();
scheduler.tick(&tx, &mut counters);
scheduler.tick(&tx, &mut counters);
scheduler.tick(&tx, &mut counters);

let mut dispatched = 0;
while rx.try_recv().is_ok() {
dispatched += 1;
}
assert_eq!(dispatched, 2);

let pending = scheduler
.storage
.list_jobs(Some(JobStatus::Pending as i32), None, None, 10, 0, None)
.unwrap();
assert_eq!(pending.len(), 1);
}

#[test]
fn test_reap_stale_jobs() {
let mut scheduler = test_scheduler();
Expand Down
173 changes: 116 additions & 57 deletions crates/taskito-core/src/scheduler/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,14 @@ const RATE_LIMIT_RETRY_DELAY_MS: i64 = 1000;
/// Delay before re-scheduling a concurrency-limited job (ms).
const CONCURRENCY_RETRY_DELAY_MS: i64 = 500;

/// Worker identifier recorded on execution claims taken by the scheduler.
const SCHEDULER_CLAIM_OWNER: &str = "scheduler";

impl Scheduler {
pub(super) fn try_dispatch(&self, job_tx: &tokio::sync::mpsc::Sender<Job>) -> Result<bool> {
let now = now_millis();

// Filter out paused queues (refresh cache every 1s)
let active_queues = {
let mut cache = self.paused_cache.lock().unwrap_or_else(|poisoned| {
warn!("paused_cache mutex was poisoned, recovering");
poisoned.into_inner()
});
if cache.1.elapsed() > Duration::from_secs(1) {
cache.0 = self
.storage
.list_paused_queues()
.unwrap_or_default()
.into_iter()
.collect();
cache.1 = std::time::Instant::now();
}
if cache.0.is_empty() {
self.queues.clone()
} else {
self.queues
.iter()
.filter(|q| !cache.0.contains(*q))
.cloned()
.collect::<Vec<_>>()
}
};

let active_queues = self.active_queues();
if active_queues.is_empty() {
return Ok(false);
}
Expand All @@ -59,7 +37,72 @@ impl Scheduler {
None => return Ok(false),
};

// Check queue-level limits
// Pre-claim soft gates: rate limits and circuit breaker.
//
// These don't need to be atomic with the claim — if two schedulers
// both pass these gates, the gate semantics still hold (each
// consumes its own token / observes the breaker independently).
if !self.check_pre_claim_gates(&job, now)? {
return Ok(false);
}

// Claim exactly-once execution. After this point, the job is reserved
// for this scheduler instance.
if !self.claim_for_dispatch(&job)? {
return Ok(false);
}

// Post-claim hard gate: concurrency caps must be checked AFTER the
// claim so two schedulers cannot both pass the cap. Status was
// already transitioned to `Running` by `dequeue_from`, so the
// running-count includes this job — use strict `>` to allow exactly
// `max_concurrent` jobs.
//
// If we exceed the cap, roll back: clear the claim row and reset
// status to `Pending` so the job can be dispatched again later.
if !self.check_post_claim_concurrency(&job)? {
self.rollback_claim_and_retry(&job.id, now + CONCURRENCY_RETRY_DELAY_MS)?;
return Ok(false);
}

if job_tx.try_send(job).is_err() {
warn!("worker channel full or closed");
}

Ok(true)
}

/// Snapshot the queue list with paused queues filtered out. The paused
/// list is cached for 1s to avoid hammering storage on every tick.
fn active_queues(&self) -> Vec<String> {
let mut cache = self.paused_cache.lock().unwrap_or_else(|poisoned| {
warn!("paused_cache mutex was poisoned, recovering");
poisoned.into_inner()
});
if cache.1.elapsed() > Duration::from_secs(1) {
cache.0 = self
.storage
.list_paused_queues()
.unwrap_or_default()
.into_iter()
.collect();
cache.1 = std::time::Instant::now();
}
if cache.0.is_empty() {
self.queues.clone()
} else {
self.queues
.iter()
.filter(|q| !cache.0.contains(*q))
.cloned()
.collect()
}
}

/// Apply the pre-claim soft gates (queue rate limit, task circuit
/// breaker, task rate limit). Returns `Ok(true)` if the job may proceed
/// to claim, `Ok(false)` if it was rescheduled.
fn check_pre_claim_gates(&self, job: &Job, now: i64) -> Result<bool> {
if let Some(qcfg) = self.queue_configs.get(&job.queue) {
if let Some(ref rl_config) = qcfg.rate_limit {
let key = format!("queue:{}", job.queue);
Expand All @@ -69,17 +112,8 @@ impl Scheduler {
return Ok(false);
}
}
if let Some(max_conc) = qcfg.max_concurrent {
let stats = self.storage.stats_by_queue(&job.queue)?;
if stats.running >= max_conc as i64 {
self.storage
.retry(&job.id, now + CONCURRENCY_RETRY_DELAY_MS)?;
return Ok(false);
}
}
}

// Check circuit breaker for this task
if let Some(config) = self.task_configs.get(&job.task_name) {
if config.circuit_breaker.is_some() && !self.circuit_breaker.allow(&job.task_name)? {
self.storage
Expand All @@ -94,36 +128,61 @@ impl Scheduler {
return Ok(false);
}
}

// Check per-task concurrency limit
if let Some(max_conc) = config.max_concurrent {
let running = self.storage.count_running_by_task(&job.task_name)?;
if running >= max_conc as i64 {
self.storage
.retry(&job.id, now + CONCURRENCY_RETRY_DELAY_MS)?;
return Ok(false);
}
}
}

// Claim exactly-once execution
match self.storage.claim_execution(&job.id, "scheduler") {
Ok(false) => {
// Already claimed by another worker — skip
return Ok(false);
}
Ok(true) => {}
Ok(true)
}

/// Try to claim exactly-once execution. Returns `Ok(true)` if the claim
/// was taken (or recoverably failed and the caller should still attempt
/// dispatch), `Ok(false)` if the job was already claimed by another
/// scheduler.
fn claim_for_dispatch(&self, job: &Job) -> Result<bool> {
match self.storage.claim_execution(&job.id, SCHEDULER_CLAIM_OWNER) {
Ok(true) => Ok(true),
Ok(false) => Ok(false),
Err(e) => {
// Don't drop the job on a transient claim error — proceed and
// let the worker handle the duplicate execution defensively.
warn!("claim_execution error for job {}: {e}", job.id);
// Proceed anyway to avoid dropping the job
Ok(true)
}
}
}

// Dispatch to worker pool (non-blocking)
if job_tx.try_send(job).is_err() {
warn!("worker channel full or closed");
/// Apply the post-claim hard gates (per-queue and per-task concurrency
/// caps). Returns `Ok(true)` if the job may proceed to dispatch,
/// `Ok(false)` if the cap is exceeded — caller is responsible for
/// rolling back the claim.
fn check_post_claim_concurrency(&self, job: &Job) -> Result<bool> {
if let Some(qcfg) = self.queue_configs.get(&job.queue) {
if let Some(max_conc) = qcfg.max_concurrent {
let stats = self.storage.stats_by_queue(&job.queue)?;
if stats.running > max_conc as i64 {
return Ok(false);
}
}
}

if let Some(config) = self.task_configs.get(&job.task_name) {
if let Some(max_conc) = config.max_concurrent {
let running = self.storage.count_running_by_task(&job.task_name)?;
if running > max_conc as i64 {
return Ok(false);
}
}
}

Ok(true)
}

/// Reverse a successful `claim_execution` and reschedule the job. Used
/// when a post-claim gate rejects the job after the claim row has
/// already been written.
fn rollback_claim_and_retry(&self, job_id: &str, next_at: i64) -> Result<()> {
if let Err(e) = self.storage.complete_execution(job_id) {
warn!("failed to clear execution claim during rollback for job {job_id}: {e}");
}
self.storage.retry(job_id, next_at)
}
}
Loading