diff --git a/crates/taskito-core/src/scheduler/result_handler.rs b/crates/taskito-core/src/scheduler/result_handler.rs index eda6714..3d57339 100644 --- a/crates/taskito-core/src/scheduler/result_handler.rs +++ b/crates/taskito-core/src/scheduler/result_handler.rs @@ -81,20 +81,24 @@ impl Scheduler { cache.record(&task_name, wall_time_ns); } - // Look up the job to get the queue name for middleware context - let queue = self - .storage - .get_job(&job_id)? - .as_ref() - .map(|j| j.queue.clone()) - .unwrap_or_default(); + // One fetch serves both the queue-context lookup and any + // subsequent DLQ move — there's no path that needs two reads. + let job = self.storage.get_job(&job_id)?; + let queue = job.as_ref().map(|j| j.queue.clone()).unwrap_or_default(); + + let move_to_dlq = |job: Option<&crate::job::Job>| -> Result<()> { + match job { + Some(j) => self.dlq.move_to_dlq(j, &error, None), + None => { + warn!("job {job_id} disappeared before DLQ move"); + Ok(()) + } + } + }; // If should_retry is false (exception filtering), skip straight to DLQ if !should_retry { - match self.storage.get_job(&job_id)? { - Some(job) => self.dlq.move_to_dlq(&job, &error, None)?, - None => warn!("job {job_id} disappeared before DLQ move"), - } + move_to_dlq(job.as_ref())?; return Ok(ResultOutcome::DeadLettered { job_id, task_name, @@ -128,11 +132,7 @@ impl Scheduler { timed_out, }) } else { - // Move to DLQ - match self.storage.get_job(&job_id)? { - Some(job) => self.dlq.move_to_dlq(&job, &error, None)?, - None => warn!("job {job_id} disappeared before DLQ move"), - } + move_to_dlq(job.as_ref())?; Ok(ResultOutcome::DeadLettered { job_id, task_name, diff --git a/crates/taskito-core/src/storage/redis_backend/archival.rs b/crates/taskito-core/src/storage/redis_backend/archival.rs index bcf8c35..967985e 100644 --- a/crates/taskito-core/src/storage/redis_backend/archival.rs +++ b/crates/taskito-core/src/storage/redis_backend/archival.rs @@ -2,16 +2,18 @@ use redis::Commands; use super::{map_err, RedisStorage}; use crate::error::{QueueError, Result}; -use crate::job::Job; +use crate::job::{Job, JobStatus}; impl RedisStorage { pub fn archive_old_jobs(&self, cutoff_ms: i64) -> Result { let mut conn = self.conn()?; let mut count = 0u64; - // Archive completed (2), dead (4), and cancelled (5) jobs - for status_int in [2, 4, 5] { - let status_key = self.key(&["jobs", "status", &status_int.to_string()]); + // Sourcing statuses from the enum guarantees that any future reorder + // or insertion in `JobStatus` doesn't silently change which buckets + // get archived. + for status in [JobStatus::Complete, JobStatus::Dead, JobStatus::Cancelled] { + let status_key = self.key(&["jobs", "status", &(status as i32).to_string()]); let job_ids: Vec = conn.smembers(&status_key).map_err(map_err)?; for id in &job_ids { diff --git a/crates/taskito-core/src/storage/redis_backend/jobs/maintenance.rs b/crates/taskito-core/src/storage/redis_backend/jobs/maintenance.rs index 40df9e0..a2e3b39 100644 --- a/crates/taskito-core/src/storage/redis_backend/jobs/maintenance.rs +++ b/crates/taskito-core/src/storage/redis_backend/jobs/maintenance.rs @@ -10,7 +10,7 @@ use crate::storage::redis_backend::{map_err, RedisStorage}; impl RedisStorage { pub fn purge_completed(&self, older_than_ms: i64) -> Result { let mut conn = self.conn()?; - let status_key = self.key(&["jobs", "status", "2"]); // Complete + let status_key = self.key(&["jobs", "status", &(JobStatus::Complete as i32).to_string()]); let job_ids: Vec = conn.smembers(&status_key).map_err(map_err)?; let mut count = 0u64; @@ -31,7 +31,7 @@ impl RedisStorage { pub fn purge_completed_with_ttl(&self, global_cutoff_ms: i64) -> Result { let now = now_millis(); let mut conn = self.conn()?; - let status_key = self.key(&["jobs", "status", "2"]); // Complete + let status_key = self.key(&["jobs", "status", &(JobStatus::Complete as i32).to_string()]); let job_ids: Vec = conn.smembers(&status_key).map_err(map_err)?; let mut count = 0u64; @@ -56,7 +56,7 @@ impl RedisStorage { pub fn reap_stale_jobs(&self, now: i64) -> Result> { let mut conn = self.conn()?; - let status_key = self.key(&["jobs", "status", "1"]); // Running + let status_key = self.key(&["jobs", "status", &(JobStatus::Running as i32).to_string()]); let job_ids: Vec = conn.smembers(&status_key).map_err(map_err)?; let mut stale = Vec::new(); @@ -79,7 +79,7 @@ impl RedisStorage { pub fn expire_pending_jobs(&self, now: i64) -> Result { let mut conn = self.conn()?; - let status_key = self.key(&["jobs", "status", "0"]); // Pending + let status_key = self.key(&["jobs", "status", &(JobStatus::Pending as i32).to_string()]); let job_ids: Vec = conn.smembers(&status_key).map_err(map_err)?; let mut count = 0u64; diff --git a/py_src/taskito/async_support/helpers.py b/py_src/taskito/async_support/helpers.py index f08bda8..d70da5f 100644 --- a/py_src/taskito/async_support/helpers.py +++ b/py_src/taskito/async_support/helpers.py @@ -11,7 +11,23 @@ def run_maybe_async(result: Any) -> Any: Safe to call from any thread that does **not** already have a running event loop (worker threads, main thread, daemon threads). + + Raises: + RuntimeError: If called from a thread that already has a running + event loop. Use the async API (``a*`` methods, ``await`` the + coroutine directly, or run in a separate thread) instead. """ - if asyncio.iscoroutine(result): + if not asyncio.iscoroutine(result): + return result + + try: + asyncio.get_running_loop() + except RuntimeError: return asyncio.run(result) - return result + + raise RuntimeError( + "Cannot run an async resource factory or callable from a thread that " + "already has a running event loop. Use the corresponding async API " + "method (e.g. `aresult()`, `aenqueue()`), `await` the coroutine " + "directly, or invoke the sync API from a worker thread." + ) diff --git a/py_src/taskito/async_support/result.py b/py_src/taskito/async_support/result.py index 6b976f0..9c9e4ef 100644 --- a/py_src/taskito/async_support/result.py +++ b/py_src/taskito/async_support/result.py @@ -69,6 +69,12 @@ async def aresult( if status == "complete": return value if time.monotonic() >= deadline: + # A terminal failure can land between the poll and this branch + # (or during the storage read itself). Re-poll once so the + # caller sees the real exception class, not `TimeoutError`. + status, value = self._poll_once() + if status == "complete": + return value raise TimeoutError( f"Job {self.id} did not complete within {timeout}s (current status: {status})" ) diff --git a/py_src/taskito/resources/pool.py b/py_src/taskito/resources/pool.py index e138c4f..db49019 100644 --- a/py_src/taskito/resources/pool.py +++ b/py_src/taskito/resources/pool.py @@ -77,30 +77,37 @@ def acquire(self) -> Any: f"Resource '{self._name}' pool timed out after {self._config.acquire_timeout}s" ) + # Try to reuse an idle instance under the lock. with self._lock: - self._total_acquisitions += 1 - self._active_count += 1 - self._total_acquire_ms += (time.monotonic() - start) * 1000 - - # Try to reuse an idle instance while self._idle: instance, created_at = self._idle.popleft() if time.monotonic() - created_at < self._config.max_lifetime: + self._record_acquired(start) return instance # Expired — teardown and try next self._teardown_instance(instance) - # No idle instance available — create a new one + # No idle instance available — create a new one. We hold the + # semaphore permit for the whole try; on failure we just release + # it. `_active_count` is only incremented after we actually have + # an instance to hand out, so factory failure can never underflow + # the counter. try: instance = run_maybe_async(self._factory(**self._dep_kwargs)) - return instance except Exception: - # Release semaphore on creation failure - with self._lock: - self._active_count -= 1 self._semaphore.release() raise + with self._lock: + self._record_acquired(start) + return instance + + def _record_acquired(self, start: float) -> None: + """Update bookkeeping for a successful acquire. Caller holds `_lock`.""" + self._total_acquisitions += 1 + self._active_count += 1 + self._total_acquire_ms += (time.monotonic() - start) * 1000 + def release(self, instance: Any) -> None: """Return an instance to the pool.""" with self._lock: diff --git a/py_src/taskito/result.py b/py_src/taskito/result.py index 78e4122..3a46f0e 100644 --- a/py_src/taskito/result.py +++ b/py_src/taskito/result.py @@ -153,6 +153,12 @@ def result( if status == "complete": return value if time.monotonic() >= deadline: + # A terminal failure can land between the poll and this branch + # (or during the storage read itself). Re-poll once so the + # caller sees the real exception class, not `TimeoutError`. + status, value = self._poll_once() + if status == "complete": + return value raise TimeoutError( f"Job {self.id} did not complete within {timeout}s " f"(current status: {self._py_job.status})" diff --git a/tests/python/test_resource_system_full.py b/tests/python/test_resource_system_full.py index 61fb9aa..084339d 100644 --- a/tests/python/test_resource_system_full.py +++ b/tests/python/test_resource_system_full.py @@ -260,6 +260,47 @@ def test_pool_stats(self) -> None: assert s2["idle"] == 1 pool.shutdown() + def test_pool_factory_failure_does_not_underflow_active_count(self) -> None: + """Active count must remain at zero (not negative) when the factory raises. + + Regression: previously the increment ran before the factory call and + was decremented in the except branch. Any future re-ordering or + intervening release() risked underflowing the counter, surfacing as a + negative `active` in `stats()`. + """ + + def boom() -> None: + raise RuntimeError("factory blew up") + + pool = ResourcePool( + "test", + boom, + teardown=None, + config=PoolConfig(pool_size=2, acquire_timeout=0.1), + ) + + for _ in range(3): + with pytest.raises(RuntimeError, match="factory blew up"): + pool.acquire() + + s = pool.stats() + assert s["active"] == 0 + assert s["total_acquisitions"] == 0 # Failed attempts don't count + # Pool capacity must not leak: a fresh acquire after failures still works. + # (Use a successful factory now to confirm the semaphore wasn't consumed.) + ok_pool = ResourcePool( + "test2", + lambda: {"ok": True}, + teardown=None, + config=PoolConfig(pool_size=1, acquire_timeout=0.1), + ) + for _ in range(3): + inst = ok_pool.acquire() + ok_pool.release(inst) + assert ok_pool.stats()["active"] == 0 + ok_pool.shutdown() + pool.shutdown() + def test_thread_local_store(self) -> None: counter = {"n": 0} diff --git a/tests/python/test_result_race.py b/tests/python/test_result_race.py new file mode 100644 index 0000000..64e117d --- /dev/null +++ b/tests/python/test_result_race.py @@ -0,0 +1,62 @@ +"""Regression: result()/aresult() must surface terminal-failure exceptions +even when the deadline is reached on the same iteration the failure lands. + +Race scenario: +1. `_poll_once()` returns ("running", None) — snapshot from before the failure +2. The job transitions to `failed`/`dead`/`cancelled` in storage +3. `time.monotonic() >= deadline` — about to give up +4. Fix: re-poll once before raising `TimeoutError` so the caller sees the + real exception (TaskFailedError / MaxRetriesExceededError / ...). +""" + +from typing import Any +from unittest.mock import patch + +import pytest + +from taskito import Queue +from taskito.exceptions import TaskFailedError + + +def _make_failing_poll(job_id: str) -> Any: + """Returns a `_poll_once` stub: 1st call → running, 2nd call → TaskFailedError.""" + call_count = [0] + + def stub() -> tuple[str, Any]: + call_count[0] += 1 + if call_count[0] == 1: + return ("running", None) + raise TaskFailedError(f"Job {job_id} failed: simulated late failure") + + stub.calls = call_count # type: ignore[attr-defined] + return stub + + +def test_result_surfaces_terminal_failure_at_deadline(tmp_path: Any) -> None: + queue = Queue(db_path=str(tmp_path / "q.db")) + + @queue.task() + def will_fail() -> None: ... + + job = will_fail.delay() + poll = _make_failing_poll(job.id) + + with patch.object(job, "_poll_once", side_effect=poll), pytest.raises(TaskFailedError): + job.result(timeout=0.01) + + assert poll.calls[0] == 2, "second defensive poll must run before raising TimeoutError" + + +async def test_aresult_surfaces_terminal_failure_at_deadline(tmp_path: Any) -> None: + queue = Queue(db_path=str(tmp_path / "q.db")) + + @queue.task() + async def will_fail() -> None: ... + + job = will_fail.delay() + poll = _make_failing_poll(job.id) + + with patch.object(job, "_poll_once", side_effect=poll), pytest.raises(TaskFailedError): + await job.aresult(timeout=0.01) + + assert poll.calls[0] == 2, "second defensive poll must run before raising TimeoutError" diff --git a/tests/python/test_run_maybe_async.py b/tests/python/test_run_maybe_async.py new file mode 100644 index 0000000..89f99bd --- /dev/null +++ b/tests/python/test_run_maybe_async.py @@ -0,0 +1,59 @@ +"""Tests for `run_maybe_async` — detection of running event loop.""" + +from __future__ import annotations + +import asyncio +from typing import Any + +import pytest + +from taskito.async_support.helpers import run_maybe_async + + +def test_run_maybe_async_passes_through_non_coroutine() -> None: + assert run_maybe_async(42) == 42 + assert run_maybe_async("hello") == "hello" + assert run_maybe_async(None) is None + + +def test_run_maybe_async_runs_coroutine_in_sync_context() -> None: + async def make_value() -> int: + return 7 + + assert run_maybe_async(make_value()) == 7 + + +async def test_run_maybe_async_raises_clear_error_under_running_loop() -> None: + """Pytest-asyncio puts us in a running loop — must surface the taskito error.""" + + async def make_value() -> int: + return 1 + + coro: Any = make_value() + with pytest.raises(RuntimeError, match="async API"): + run_maybe_async(coro) + # Drain to silence the "coroutine was never awaited" warning. + coro.close() + + +async def test_run_maybe_async_async_message_mentions_a_methods() -> None: + async def make_value() -> int: + return 1 + + coro: Any = make_value() + try: + run_maybe_async(coro) + except RuntimeError as exc: + assert "aresult" in str(exc) or "aenqueue" in str(exc) or "await" in str(exc) + finally: + coro.close() + + +def test_run_maybe_async_no_loop_uses_asyncio_run() -> None: + """Sanity: a fresh thread with no loop should run a coroutine to completion.""" + + async def slow() -> str: + await asyncio.sleep(0.001) + return "ok" + + assert run_maybe_async(slow()) == "ok"