Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions crates/taskito-core/src/scheduler/result_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,24 @@ impl Scheduler {
cache.record(&task_name, wall_time_ns);
}

// Look up the job to get the queue name for middleware context
let queue = self
.storage
.get_job(&job_id)?
.as_ref()
.map(|j| j.queue.clone())
.unwrap_or_default();
// One fetch serves both the queue-context lookup and any
// subsequent DLQ move — there's no path that needs two reads.
let job = self.storage.get_job(&job_id)?;
let queue = job.as_ref().map(|j| j.queue.clone()).unwrap_or_default();

let move_to_dlq = |job: Option<&crate::job::Job>| -> Result<()> {
match job {
Some(j) => self.dlq.move_to_dlq(j, &error, None),
None => {
warn!("job {job_id} disappeared before DLQ move");
Ok(())
}
}
};

// If should_retry is false (exception filtering), skip straight to DLQ
if !should_retry {
match self.storage.get_job(&job_id)? {
Some(job) => self.dlq.move_to_dlq(&job, &error, None)?,
None => warn!("job {job_id} disappeared before DLQ move"),
}
move_to_dlq(job.as_ref())?;
return Ok(ResultOutcome::DeadLettered {
job_id,
task_name,
Expand Down Expand Up @@ -128,11 +132,7 @@ impl Scheduler {
timed_out,
})
} else {
// Move to DLQ
match self.storage.get_job(&job_id)? {
Some(job) => self.dlq.move_to_dlq(&job, &error, None)?,
None => warn!("job {job_id} disappeared before DLQ move"),
}
move_to_dlq(job.as_ref())?;
Ok(ResultOutcome::DeadLettered {
job_id,
task_name,
Expand Down
10 changes: 6 additions & 4 deletions crates/taskito-core/src/storage/redis_backend/archival.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ use redis::Commands;

use super::{map_err, RedisStorage};
use crate::error::{QueueError, Result};
use crate::job::Job;
use crate::job::{Job, JobStatus};

impl RedisStorage {
pub fn archive_old_jobs(&self, cutoff_ms: i64) -> Result<u64> {
let mut conn = self.conn()?;
let mut count = 0u64;

// Archive completed (2), dead (4), and cancelled (5) jobs
for status_int in [2, 4, 5] {
let status_key = self.key(&["jobs", "status", &status_int.to_string()]);
// Sourcing statuses from the enum guarantees that any future reorder
// or insertion in `JobStatus` doesn't silently change which buckets
// get archived.
for status in [JobStatus::Complete, JobStatus::Dead, JobStatus::Cancelled] {
let status_key = self.key(&["jobs", "status", &(status as i32).to_string()]);
let job_ids: Vec<String> = conn.smembers(&status_key).map_err(map_err)?;

for id in &job_ids {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::storage::redis_backend::{map_err, RedisStorage};
impl RedisStorage {
pub fn purge_completed(&self, older_than_ms: i64) -> Result<u64> {
let mut conn = self.conn()?;
let status_key = self.key(&["jobs", "status", "2"]); // Complete
let status_key = self.key(&["jobs", "status", &(JobStatus::Complete as i32).to_string()]);
let job_ids: Vec<String> = conn.smembers(&status_key).map_err(map_err)?;

let mut count = 0u64;
Expand All @@ -31,7 +31,7 @@ impl RedisStorage {
pub fn purge_completed_with_ttl(&self, global_cutoff_ms: i64) -> Result<u64> {
let now = now_millis();
let mut conn = self.conn()?;
let status_key = self.key(&["jobs", "status", "2"]); // Complete
let status_key = self.key(&["jobs", "status", &(JobStatus::Complete as i32).to_string()]);
let job_ids: Vec<String> = conn.smembers(&status_key).map_err(map_err)?;

let mut count = 0u64;
Expand All @@ -56,7 +56,7 @@ impl RedisStorage {

pub fn reap_stale_jobs(&self, now: i64) -> Result<Vec<Job>> {
let mut conn = self.conn()?;
let status_key = self.key(&["jobs", "status", "1"]); // Running
let status_key = self.key(&["jobs", "status", &(JobStatus::Running as i32).to_string()]);
let job_ids: Vec<String> = conn.smembers(&status_key).map_err(map_err)?;

let mut stale = Vec::new();
Expand All @@ -79,7 +79,7 @@ impl RedisStorage {

pub fn expire_pending_jobs(&self, now: i64) -> Result<u64> {
let mut conn = self.conn()?;
let status_key = self.key(&["jobs", "status", "0"]); // Pending
let status_key = self.key(&["jobs", "status", &(JobStatus::Pending as i32).to_string()]);
let job_ids: Vec<String> = conn.smembers(&status_key).map_err(map_err)?;

let mut count = 0u64;
Expand Down
20 changes: 18 additions & 2 deletions py_src/taskito/async_support/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,23 @@ def run_maybe_async(result: Any) -> Any:

Safe to call from any thread that does **not** already have a running
event loop (worker threads, main thread, daemon threads).

Raises:
RuntimeError: If called from a thread that already has a running
event loop. Use the async API (``a*`` methods, ``await`` the
coroutine directly, or run in a separate thread) instead.
"""
if asyncio.iscoroutine(result):
if not asyncio.iscoroutine(result):
return result

try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(result)
return result

raise RuntimeError(
"Cannot run an async resource factory or callable from a thread that "
"already has a running event loop. Use the corresponding async API "
"method (e.g. `aresult()`, `aenqueue()`), `await` the coroutine "
"directly, or invoke the sync API from a worker thread."
)
6 changes: 6 additions & 0 deletions py_src/taskito/async_support/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ async def aresult(
if status == "complete":
return value
if time.monotonic() >= deadline:
# A terminal failure can land between the poll and this branch
# (or during the storage read itself). Re-poll once so the
# caller sees the real exception class, not `TimeoutError`.
status, value = self._poll_once()
if status == "complete":
return value
raise TimeoutError(
f"Job {self.id} did not complete within {timeout}s (current status: {status})"
)
Expand Down
27 changes: 17 additions & 10 deletions py_src/taskito/resources/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,30 +77,37 @@ def acquire(self) -> Any:
f"Resource '{self._name}' pool timed out after {self._config.acquire_timeout}s"
)

# Try to reuse an idle instance under the lock.
with self._lock:
self._total_acquisitions += 1
self._active_count += 1
self._total_acquire_ms += (time.monotonic() - start) * 1000

# Try to reuse an idle instance
while self._idle:
instance, created_at = self._idle.popleft()
if time.monotonic() - created_at < self._config.max_lifetime:
self._record_acquired(start)
return instance
# Expired — teardown and try next
self._teardown_instance(instance)

# No idle instance available — create a new one
# No idle instance available — create a new one. We hold the
# semaphore permit for the whole try; on failure we just release
# it. `_active_count` is only incremented after we actually have
# an instance to hand out, so factory failure can never underflow
# the counter.
try:
instance = run_maybe_async(self._factory(**self._dep_kwargs))
return instance
except Exception:
# Release semaphore on creation failure
with self._lock:
self._active_count -= 1
self._semaphore.release()
raise

with self._lock:
self._record_acquired(start)
return instance

def _record_acquired(self, start: float) -> None:
"""Update bookkeeping for a successful acquire. Caller holds `_lock`."""
self._total_acquisitions += 1
self._active_count += 1
self._total_acquire_ms += (time.monotonic() - start) * 1000

def release(self, instance: Any) -> None:
"""Return an instance to the pool."""
with self._lock:
Expand Down
6 changes: 6 additions & 0 deletions py_src/taskito/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ def result(
if status == "complete":
return value
if time.monotonic() >= deadline:
# A terminal failure can land between the poll and this branch
# (or during the storage read itself). Re-poll once so the
# caller sees the real exception class, not `TimeoutError`.
status, value = self._poll_once()
if status == "complete":
return value
raise TimeoutError(
f"Job {self.id} did not complete within {timeout}s "
f"(current status: {self._py_job.status})"
Expand Down
41 changes: 41 additions & 0 deletions tests/python/test_resource_system_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,47 @@ def test_pool_stats(self) -> None:
assert s2["idle"] == 1
pool.shutdown()

def test_pool_factory_failure_does_not_underflow_active_count(self) -> None:
"""Active count must remain at zero (not negative) when the factory raises.

Regression: previously the increment ran before the factory call and
was decremented in the except branch. Any future re-ordering or
intervening release() risked underflowing the counter, surfacing as a
negative `active` in `stats()`.
"""

def boom() -> None:
raise RuntimeError("factory blew up")

pool = ResourcePool(
"test",
boom,
teardown=None,
config=PoolConfig(pool_size=2, acquire_timeout=0.1),
)

for _ in range(3):
with pytest.raises(RuntimeError, match="factory blew up"):
pool.acquire()

s = pool.stats()
assert s["active"] == 0
assert s["total_acquisitions"] == 0 # Failed attempts don't count
# Pool capacity must not leak: a fresh acquire after failures still works.
# (Use a successful factory now to confirm the semaphore wasn't consumed.)
ok_pool = ResourcePool(
"test2",
lambda: {"ok": True},
teardown=None,
config=PoolConfig(pool_size=1, acquire_timeout=0.1),
)
for _ in range(3):
inst = ok_pool.acquire()
ok_pool.release(inst)
assert ok_pool.stats()["active"] == 0
ok_pool.shutdown()
pool.shutdown()

def test_thread_local_store(self) -> None:
counter = {"n": 0}

Expand Down
62 changes: 62 additions & 0 deletions tests/python/test_result_race.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Regression: result()/aresult() must surface terminal-failure exceptions
even when the deadline is reached on the same iteration the failure lands.

Race scenario:
1. `_poll_once()` returns ("running", None) — snapshot from before the failure
2. The job transitions to `failed`/`dead`/`cancelled` in storage
3. `time.monotonic() >= deadline` — about to give up
4. Fix: re-poll once before raising `TimeoutError` so the caller sees the
real exception (TaskFailedError / MaxRetriesExceededError / ...).
"""

from typing import Any
from unittest.mock import patch

import pytest

from taskito import Queue
from taskito.exceptions import TaskFailedError


def _make_failing_poll(job_id: str) -> Any:
"""Returns a `_poll_once` stub: 1st call → running, 2nd call → TaskFailedError."""
call_count = [0]

def stub() -> tuple[str, Any]:
call_count[0] += 1
if call_count[0] == 1:
return ("running", None)
raise TaskFailedError(f"Job {job_id} failed: simulated late failure")

stub.calls = call_count # type: ignore[attr-defined]
return stub


def test_result_surfaces_terminal_failure_at_deadline(tmp_path: Any) -> None:
queue = Queue(db_path=str(tmp_path / "q.db"))

@queue.task()
def will_fail() -> None: ...

job = will_fail.delay()
poll = _make_failing_poll(job.id)

with patch.object(job, "_poll_once", side_effect=poll), pytest.raises(TaskFailedError):
job.result(timeout=0.01)

assert poll.calls[0] == 2, "second defensive poll must run before raising TimeoutError"


async def test_aresult_surfaces_terminal_failure_at_deadline(tmp_path: Any) -> None:
queue = Queue(db_path=str(tmp_path / "q.db"))

@queue.task()
async def will_fail() -> None: ...

job = will_fail.delay()
poll = _make_failing_poll(job.id)

with patch.object(job, "_poll_once", side_effect=poll), pytest.raises(TaskFailedError):
await job.aresult(timeout=0.01)

assert poll.calls[0] == 2, "second defensive poll must run before raising TimeoutError"
59 changes: 59 additions & 0 deletions tests/python/test_run_maybe_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Tests for `run_maybe_async` — detection of running event loop."""

from __future__ import annotations

import asyncio
from typing import Any

import pytest

from taskito.async_support.helpers import run_maybe_async


def test_run_maybe_async_passes_through_non_coroutine() -> None:
assert run_maybe_async(42) == 42
assert run_maybe_async("hello") == "hello"
assert run_maybe_async(None) is None


def test_run_maybe_async_runs_coroutine_in_sync_context() -> None:
async def make_value() -> int:
return 7

assert run_maybe_async(make_value()) == 7


async def test_run_maybe_async_raises_clear_error_under_running_loop() -> None:
"""Pytest-asyncio puts us in a running loop — must surface the taskito error."""

async def make_value() -> int:
return 1

coro: Any = make_value()
with pytest.raises(RuntimeError, match="async API"):
run_maybe_async(coro)
# Drain to silence the "coroutine was never awaited" warning.
coro.close()


async def test_run_maybe_async_async_message_mentions_a_methods() -> None:
async def make_value() -> int:
return 1

coro: Any = make_value()
try:
run_maybe_async(coro)
except RuntimeError as exc:
assert "aresult" in str(exc) or "aenqueue" in str(exc) or "await" in str(exc)
finally:
coro.close()


def test_run_maybe_async_no_loop_uses_asyncio_run() -> None:
"""Sanity: a fresh thread with no loop should run a coroutine to completion."""

async def slow() -> str:
await asyncio.sleep(0.001)
return "ok"

assert run_maybe_async(slow()) == "ok"
Loading