From 14291a23355a682314183b1263e2da7b9e8ed6fe Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Thu, 30 Apr 2026 12:04:36 +0530 Subject: [PATCH 1/5] fix(prefork): kill children that exceed per-job timeout (#81) --- crates/taskito-python/src/prefork/child.rs | 13 +- crates/taskito-python/src/prefork/mod.rs | 312 ++++++++++++------ crates/taskito-python/src/prefork/slot.rs | 62 ++++ crates/taskito-python/src/prefork/watchdog.rs | 93 ++++++ 4 files changed, 387 insertions(+), 93 deletions(-) create mode 100644 crates/taskito-python/src/prefork/slot.rs create mode 100644 crates/taskito-python/src/prefork/watchdog.rs diff --git a/crates/taskito-python/src/prefork/child.rs b/crates/taskito-python/src/prefork/child.rs index d2a0540..00b449e 100644 --- a/crates/taskito-python/src/prefork/child.rs +++ b/crates/taskito-python/src/prefork/child.rs @@ -56,11 +56,22 @@ pub struct ChildProcess { impl ChildProcess { /// Check if the child process is still alive. - #[allow(dead_code)] pub fn is_alive(&mut self) -> bool { matches!(self.process.try_wait(), Ok(None)) } + /// `SIGKILL` the child and reap the zombie. + /// + /// Both calls are best-effort: the child may have already exited (e.g. + /// crashed) between the watchdog's deadline scan and this call, in which + /// case `kill` returns `EPERM`/`ESRCH` and `wait` returns immediately. + /// After this returns, `is_alive()` is guaranteed to be `false`, so the + /// dispatcher's respawn path will pick the slot up on the next job. + pub fn kill_and_reap(&mut self) { + let _ = self.process.kill(); + let _ = self.process.wait(); + } + /// Wait for the child to exit, with a timeout. Kills if it doesn't exit in time. pub fn wait_or_kill(&mut self, timeout: std::time::Duration) { let start = std::time::Instant::now(); diff --git a/crates/taskito-python/src/prefork/mod.rs b/crates/taskito-python/src/prefork/mod.rs index e60090b..ce2bab4 100644 --- a/crates/taskito-python/src/prefork/mod.rs +++ b/crates/taskito-python/src/prefork/mod.rs @@ -8,15 +8,20 @@ //! Architecture: //! - One dispatch thread: receives `Job` from scheduler, sends to children via stdin //! - N reader threads: one per child, reads results from stdout, sends to `result_tx` +//! - One watchdog thread: enforces per-job timeouts by `SIGKILL`-ing children +//! whose deadlines pass //! - Child processes: run `python -m taskito.prefork ` mod child; mod dispatch; pub mod protocol; +mod slot; +mod watchdog; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; -use std::sync::Arc; -use std::thread; +use std::sync::{Arc, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; use async_trait::async_trait; use crossbeam_channel::Sender; @@ -25,8 +30,13 @@ use taskito_core::job::Job; use taskito_core::scheduler::JobResult; use taskito_core::worker::WorkerDispatcher; -use child::{spawn_child, ChildWriter}; +use child::{spawn_child, ChildProcess, ChildReader, ChildWriter}; use protocol::ParentMessage; +use slot::{ActiveJob, SlotState}; + +/// How long graceful shutdown will wait for each child to drain before +/// sending `SIGKILL`. +const SHUTDOWN_DRAIN: Duration = Duration::from_secs(30); /// Multi-process worker pool that dispatches jobs to child Python processes. pub struct PreforkPool { @@ -57,100 +67,88 @@ impl WorkerDispatcher for PreforkPool { result_tx: Sender, ) { let num_workers = self.num_workers; - let app_path = self.app_path.clone(); - let python = self.python.clone(); - let shutdown = &self.shutdown; - // Spawn all children and split into writers + readers - let mut writers: Vec = Vec::with_capacity(num_workers); + // Shared per-child state. + let slots: SlotState = slot::new_slots(num_workers); let in_flight: Arc> = Arc::new((0..num_workers).map(|_| AtomicU32::new(0)).collect()); - let mut reader_handles: Vec> = Vec::new(); - let mut process_handles: Vec = Vec::new(); - - for i in 0..num_workers { - match spawn_child(&python, &app_path) { - Ok((writer, mut reader, process)) => { - log::info!("[taskito] prefork child {i} ready"); - writers.push(writer); - process_handles.push(process); - - // Spawn a reader thread for this child - let tx = result_tx.clone(); - let in_flight_counter = in_flight.clone(); - let child_idx = i; - reader_handles.push(thread::spawn(move || { - loop { - match reader.read() { - Ok(msg) => { - if let Some(job_result) = msg.into_job_result() { - in_flight_counter[child_idx] - .fetch_sub(1, Ordering::Relaxed); - if tx.send(job_result).is_err() { - break; // result channel closed - } - } - } - Err(e) => { - log::warn!( - "[taskito] prefork child {child_idx} reader error: {e}" - ); - break; - } - } - } - })); - } - Err(e) => { - log::error!("[taskito] failed to spawn prefork child {i}: {e}"); - } + let processes: Arc>>> = + Arc::new((0..num_workers).map(|_| Mutex::new(None)).collect()); + + // Per-child writers stay on the dispatch thread. + let mut writers: Vec> = (0..num_workers).map(|_| None).collect(); + let mut reader_handles: Vec> = Vec::new(); + + // Initial spawn. + for idx in 0..num_workers { + if let Some(handle) = start_child( + idx, + &self.python, + &self.app_path, + &mut writers, + &processes, + &slots, + &in_flight, + &result_tx, + ) { + reader_handles.push(handle); } } - if writers.is_empty() { + if writers.iter().all(Option::is_none) { log::error!("[taskito] no prefork children started, aborting"); return; } log::info!( "[taskito] prefork pool running with {} children", - writers.len() + writers.iter().filter(|w| w.is_some()).count() + ); + + // Watchdog: kills children that exceed their per-job timeout. + let watchdog_shutdown = Arc::new(AtomicBool::new(false)); + let watchdog_handle = watchdog::spawn( + slots.clone(), + processes.clone(), + in_flight.clone(), + result_tx.clone(), + watchdog_shutdown.clone(), ); - // Dispatch loop: receive jobs from scheduler, send to least-loaded child + // Dispatch loop. let mut restart_count: u64 = 0; while let Some(job) = job_rx.recv().await { - if shutdown.load(Ordering::Relaxed) { + if self.shutdown.load(Ordering::Relaxed) { break; } - // Check for dead children and restart them - for i in 0..process_handles.len() { - if !process_handles[i].is_alive() { - log::warn!("[taskito] prefork child {i} died, restarting"); + // Bring back any children that have exited (crashed, killed by + // watchdog, OOM, etc.). + for idx in 0..num_workers { + let dead = match processes[idx].lock() { + Ok(mut guard) => match guard.as_mut() { + Some(p) => !p.is_alive(), + None => true, + }, + Err(_) => false, + }; + if dead { + log::warn!("[taskito] prefork child {idx} died, restarting"); restart_count += 1; - match spawn_child(&python, &app_path) { - Ok((writer, mut reader, process)) => { - writers[i] = writer; - process_handles[i] = process; - in_flight[i].store(0, Ordering::Relaxed); - let tx = result_tx.clone(); - let in_flight_counter = in_flight.clone(); - reader_handles.push(thread::spawn(move || { - while let Ok(msg) = reader.read() { - if let Some(job_result) = msg.into_job_result() { - in_flight_counter[i].fetch_sub(1, Ordering::Relaxed); - if tx.send(job_result).is_err() { - break; - } - } - } - })); - log::info!("[taskito] prefork child {i} restarted (total restarts: {restart_count})"); - } - Err(e) => { - log::error!("[taskito] failed to restart child {i}: {e}"); - } + if let Some(handle) = start_child( + idx, + &self.python, + &self.app_path, + &mut writers, + &processes, + &slots, + &in_flight, + &result_tx, + ) { + reader_handles.push(handle); + log::info!( + "[taskito] prefork child {idx} restarted (total restarts: {restart_count})" + ); } } } @@ -161,38 +159,168 @@ impl WorkerDispatcher for PreforkPool { .collect(); let idx = dispatch::least_loaded(&counts); - let msg = ParentMessage::from(&job); - if let Err(e) = writers[idx].send(&msg) { + let Some(writer) = writers[idx].as_mut() else { log::error!( - "[taskito] failed to send job {} to child {idx}: {e}", + "[taskito] no live writer for child {idx}, dropping job {}; will be reaped", job.id ); - // Job will be reaped by the scheduler's stale job reaper continue; + }; + + let active = ActiveJob { + job_id: job.id.clone(), + task_name: job.task_name.clone(), + retry_count: job.retry_count, + max_retries: job.max_retries, + timeout_ms: job.timeout_ms, + started_at: Instant::now(), + deadline: deadline_from_timeout(job.timeout_ms), + }; + + // Register *before* sending so a fast child can never publish a + // result the reader can't pair with a slot entry. + slot::set(&slots, idx, active); + + let msg = ParentMessage::from(&job); + match writer.send(&msg) { + Ok(()) => { + in_flight[idx].fetch_add(1, Ordering::Relaxed); + } + Err(e) => { + // Roll back the slot install — neither reader nor watchdog + // should fire for this aborted dispatch. + let _ = slot::take(&slots, idx); + log::error!( + "[taskito] failed to send job {} to child {idx}: {e}", + job.id + ); + // Job will be reaped by the scheduler's stale-job reaper. + } } - in_flight[idx].fetch_add(1, Ordering::Relaxed); } - // Graceful shutdown: tell all children to stop - for (i, writer) in writers.iter_mut().enumerate() { - writer.send_shutdown(); - log::info!("[taskito] sent shutdown to prefork child {i}"); + // Stop the watchdog before sending shutdown so it doesn't race with + // children draining their final results. + watchdog_shutdown.store(true, Ordering::SeqCst); + + // Graceful shutdown: tell all live children to stop. + for (idx, writer) in writers.iter_mut().enumerate() { + if let Some(w) = writer.as_mut() { + w.send_shutdown(); + log::info!("[taskito] sent shutdown to prefork child {idx}"); + } } - // Wait for children to exit - let drain_timeout = std::time::Duration::from_secs(30); - for (i, process) in process_handles.iter_mut().enumerate() { - process.wait_or_kill(drain_timeout); - log::info!("[taskito] prefork child {i} exited"); + // Wait for children to exit (or kill after the drain timeout). + for idx in 0..num_workers { + if let Ok(mut guard) = processes[idx].lock() { + if let Some(process) = guard.as_mut() { + process.wait_or_kill(SHUTDOWN_DRAIN); + log::info!("[taskito] prefork child {idx} exited"); + } + } } - // Wait for reader threads + // Reader threads exit when their child closes stdout. for handle in reader_handles { let _ = handle.join(); } + let _ = watchdog_handle.join(); } fn shutdown(&self) { self.shutdown.store(true, Ordering::SeqCst); } } + +/// Spawn child `idx` and its reader thread, plumbing the writer + process into +/// the shared state. Returns the reader thread handle on success, `None` on +/// spawn failure (already logged). +#[allow(clippy::too_many_arguments)] +fn start_child( + idx: usize, + python: &str, + app_path: &str, + writers: &mut [Option], + processes: &Arc>>>, + slots: &SlotState, + in_flight: &Arc>, + result_tx: &Sender, +) -> Option> { + match spawn_child(python, app_path) { + Ok((writer, reader, process)) => { + log::info!("[taskito] prefork child {idx} ready"); + writers[idx] = Some(writer); + if let Ok(mut slot) = processes[idx].lock() { + *slot = Some(process); + } + // Reset the slot for the new child — the killed/dead one's job (if + // any) was already completed by the watchdog or shutdown path. + let _ = slot::take(slots, idx); + in_flight[idx].store(0, Ordering::Relaxed); + + Some(spawn_reader_thread( + idx, + reader, + slots.clone(), + in_flight.clone(), + result_tx.clone(), + )) + } + Err(e) => { + log::error!("[taskito] failed to spawn prefork child {idx}: {e}"); + None + } + } +} + +/// Reader thread: forwards child results to the scheduler. +/// +/// The slot acts as the ownership token — the reader emits a result *only* if +/// it can `take()` the slot entry first. If the watchdog has already taken +/// the slot (deadline expired), the reader silently drops the message because +/// the watchdog has already synthesised the timeout failure. +fn spawn_reader_thread( + idx: usize, + mut reader: ChildReader, + slots: SlotState, + in_flight: Arc>, + result_tx: Sender, +) -> JoinHandle<()> { + thread::Builder::new() + .name(format!("taskito-prefork-reader-{idx}")) + .spawn(move || loop { + match reader.read() { + Ok(msg) => { + let Some(job_result) = msg.into_job_result() else { + continue; + }; + if slot::take(&slots, idx).is_none() { + // Watchdog already completed this job; drop the + // (now-redundant) child message. + continue; + } + in_flight[idx].fetch_sub(1, Ordering::Relaxed); + if result_tx.send(job_result).is_err() { + break; // result channel closed + } + } + Err(e) => { + log::warn!("[taskito] prefork child {idx} reader error: {e}"); + break; + } + } + }) + .expect("failed to spawn prefork reader thread") +} + +/// Convert a per-task timeout in milliseconds to an absolute `Instant` deadline. +/// Returns `None` for `timeout_ms <= 0` (no timeout configured) so the watchdog +/// skips the slot. +fn deadline_from_timeout(timeout_ms: i64) -> Option { + if timeout_ms <= 0 { + None + } else { + Instant::now().checked_add(Duration::from_millis(timeout_ms as u64)) + } +} diff --git a/crates/taskito-python/src/prefork/slot.rs b/crates/taskito-python/src/prefork/slot.rs new file mode 100644 index 0000000..e17b8bd --- /dev/null +++ b/crates/taskito-python/src/prefork/slot.rs @@ -0,0 +1,62 @@ +//! Per-child active-job tracking, shared between dispatcher, reader, and watchdog. +//! +//! Each child runs at most one job at a time, so each slot holds an +//! `Option`. The slot is the single source of truth for "this child +//! has a job in flight": whichever thread successfully `take()`s a `Some` +//! becomes responsible for emitting exactly one `JobResult` and decrementing +//! the in-flight counter — preventing double-completion races between the +//! reader (child finished normally) and the watchdog (child exceeded its +//! deadline). + +use std::sync::{Arc, Mutex}; +use std::time::Instant; + +/// Metadata about a job currently being executed by a child process. +#[derive(Clone)] +pub struct ActiveJob { + pub job_id: String, + pub task_name: String, + pub retry_count: i32, + pub max_retries: i32, + pub timeout_ms: i64, + pub started_at: Instant, + /// Absolute deadline. `None` => no timeout configured. + pub deadline: Option, +} + +/// One mutex-protected `Option` per child, shared across threads. +pub type SlotState = Arc>>>; + +/// Build an empty slot vector for `n` children. +pub fn new_slots(n: usize) -> SlotState { + Arc::new((0..n).map(|_| Mutex::new(None)).collect()) +} + +/// Atomically install `job` in slot `idx`, returning any previous occupant +/// (which would only happen on a programming error — children are sequential). +pub fn set(slots: &SlotState, idx: usize, job: ActiveJob) -> Option { + slots[idx].lock().expect("slot mutex poisoned").replace(job) +} + +/// Atomically take whatever is in slot `idx`, leaving it empty. +pub fn take(slots: &SlotState, idx: usize) -> Option { + slots[idx].lock().expect("slot mutex poisoned").take() +} + +/// Atomically take the slot only if its deadline has passed at `now`. +/// +/// Re-checking the deadline *while holding the lock* is what makes this +/// race-free: if a result arrived between the watchdog's scan and its +/// take, the reader will have cleared the slot first and we return `None`. +pub fn take_if_expired(slots: &SlotState, idx: usize, now: Instant) -> Option { + let mut guard = slots[idx].lock().expect("slot mutex poisoned"); + let expired = guard + .as_ref() + .and_then(|j| j.deadline) + .is_some_and(|d| now >= d); + if expired { + guard.take() + } else { + None + } +} diff --git a/crates/taskito-python/src/prefork/watchdog.rs b/crates/taskito-python/src/prefork/watchdog.rs new file mode 100644 index 0000000..8023d70 --- /dev/null +++ b/crates/taskito-python/src/prefork/watchdog.rs @@ -0,0 +1,93 @@ +//! Per-job timeout enforcer for the prefork pool. +//! +//! A single thread wakes every `TICK` and, for each child slot whose deadline +//! has passed, atomically takes ownership of the job, kills the child process, +//! decrements the in-flight counter, and emits a synthesised +//! `JobResult::Failure { timed_out: true }` — matching the shape produced by +//! the scheduler's `reap_stale_jobs` maintenance task so downstream middleware +//! and event hooks treat both paths identically. + +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; + +use crossbeam_channel::Sender; + +use taskito_core::scheduler::JobResult; + +use super::child::ChildProcess; +use super::slot::{self, SlotState}; + +/// Watchdog poll interval. 250ms balances kill latency against parent CPU use: +/// a 5s task timeout is enforced within ~5.25s, well inside the issue's +/// "small watchdog tick budget". +pub const TICK: Duration = Duration::from_millis(250); + +/// Spawn the timeout-watchdog thread. +/// +/// The thread exits when `shutdown` is set. Children killed for exceeding +/// their deadline are left for the dispatcher's existing dead-child respawn +/// logic to bring back online on the next dispatched job. +pub fn spawn( + slots: SlotState, + processes: Arc>>>, + in_flight: Arc>, + result_tx: Sender, + shutdown: Arc, +) -> JoinHandle<()> { + thread::Builder::new() + .name("taskito-prefork-watchdog".into()) + .spawn(move || run(slots, processes, in_flight, result_tx, shutdown)) + .expect("failed to spawn prefork watchdog thread") +} + +fn run( + slots: SlotState, + processes: Arc>>>, + in_flight: Arc>, + result_tx: Sender, + shutdown: Arc, +) { + while !shutdown.load(Ordering::Relaxed) { + thread::sleep(TICK); + if shutdown.load(Ordering::Relaxed) { + break; + } + + let now = Instant::now(); + for idx in 0..slots.len() { + let Some(job) = slot::take_if_expired(&slots, idx, now) else { + continue; + }; + + log::warn!( + "[taskito] prefork child {idx} exceeded {timeout}ms timeout on job {job_id}, killing", + timeout = job.timeout_ms, + job_id = job.job_id, + ); + + if let Ok(mut guard) = processes[idx].lock() { + if let Some(process) = guard.as_mut() { + process.kill_and_reap(); + } + } + in_flight[idx].fetch_sub(1, Ordering::Relaxed); + + let result = JobResult::Failure { + job_id: job.job_id, + error: format!("job timed out after {}ms", job.timeout_ms), + retry_count: job.retry_count, + max_retries: job.max_retries, + task_name: job.task_name, + wall_time_ns: job.started_at.elapsed().as_nanos() as i64, + should_retry: true, + timed_out: true, + }; + if result_tx.send(result).is_err() { + // Scheduler already shut down; nothing more to do. + return; + } + } + } +} From 976158a1dd6680f6011ebe7c4be737a005e5e968 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Thu, 30 Apr 2026 12:04:43 +0530 Subject: [PATCH 2/5] test(prefork): cover hung-task watchdog kill path --- tests/python/prefork_apps/__init__.py | 0 tests/python/prefork_apps/timeout_app.py | 41 +++++++ tests/python/test_prefork.py | 137 +++++++++++++++++++++++ 3 files changed, 178 insertions(+) create mode 100644 tests/python/prefork_apps/__init__.py create mode 100644 tests/python/prefork_apps/timeout_app.py diff --git a/tests/python/prefork_apps/__init__.py b/tests/python/prefork_apps/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/python/prefork_apps/timeout_app.py b/tests/python/prefork_apps/timeout_app.py new file mode 100644 index 0000000..b546e34 --- /dev/null +++ b/tests/python/prefork_apps/timeout_app.py @@ -0,0 +1,41 @@ +"""Module-level Queue + tasks used by the prefork timeout regression tests. + +Prefork children import the app module independently, so the task registry +must live at a module path resolvable inside the child interpreter — that's +why this module exists as a sibling of ``test_prefork.py`` rather than being +defined inline in the test. + +The DB path comes from ``TASKITO_TIMEOUT_TEST_DB`` so each test run can use a +unique tmp file while still letting the parent and child build identical +Queue instances from this same module. +""" + +from __future__ import annotations + +import os +import time + +from taskito import Queue + +queue = Queue(db_path=os.environ.get("TASKITO_TIMEOUT_TEST_DB", "/tmp/taskito-timeout.db")) + + +@queue.task(timeout=2, max_retries=0) +def hang() -> None: + """Spin forever — used to trigger the watchdog's SIGKILL path.""" + while True: + pass + + +@queue.task() +def quick(x: int) -> int: + """Returns immediately — used to verify timeout=0 (no timeout) is unaffected.""" + return x * 2 + + +@queue.task(timeout=2, max_retries=0) +def sleep_then_finish(seconds: float) -> str: + """Sleeps for `seconds`, then finishes — used to verify the watchdog only + fires when the deadline is actually exceeded.""" + time.sleep(seconds) + return "done" diff --git a/tests/python/test_prefork.py b/tests/python/test_prefork.py index 282f2ce..47b0a60 100644 --- a/tests/python/test_prefork.py +++ b/tests/python/test_prefork.py @@ -2,12 +2,21 @@ from __future__ import annotations +import contextlib +import importlib +import os +import sys import threading +import time +from collections.abc import Iterator from pathlib import Path +from typing import Any import pytest from taskito import Queue +from taskito.context import JobContext +from taskito.middleware import TaskMiddleware def test_prefork_requires_app_path(tmp_path: Path) -> None: @@ -51,3 +60,131 @@ def multiply(x: int, y: int) -> int: assert result == 21 q._inner.request_shutdown() + + +# --------------------------------------------------------------------------- +# Per-job timeout enforcement (issue #81) +# --------------------------------------------------------------------------- + +PREFORK_APP_PATH = "prefork_apps.timeout_app:queue" +PREFORK_APP_DIR = str(Path(__file__).parent) + + +@pytest.fixture +def timeout_app(tmp_path: Path) -> Iterator[object]: + """Set up the module-level timeout-test app with a per-test DB path. + + The Queue inside ``prefork_apps.timeout_app`` is constructed at import time + from ``$TASKITO_TIMEOUT_TEST_DB``, and the prefork child re-imports the + same module fresh in its own interpreter — so the env var must be set in + the parent process before that import happens, and propagates to the + child via inherited env. + """ + db_path = str(tmp_path / "timeout.db") + prev_db = os.environ.get("TASKITO_TIMEOUT_TEST_DB") + prev_pythonpath = os.environ.get("PYTHONPATH") + + os.environ["TASKITO_TIMEOUT_TEST_DB"] = db_path + # Make `prefork_apps.timeout_app` importable in both parent and (inherited) + # child without depending on pytest's rootdir manipulation. + os.environ["PYTHONPATH"] = ( + f"{PREFORK_APP_DIR}{os.pathsep}{prev_pythonpath}" if prev_pythonpath else PREFORK_APP_DIR + ) + if PREFORK_APP_DIR not in sys.path: + sys.path.insert(0, PREFORK_APP_DIR) + + # Force a fresh module import so the Queue picks up the per-test DB path. + sys.modules.pop("prefork_apps.timeout_app", None) + sys.modules.pop("prefork_apps", None) + module = importlib.import_module("prefork_apps.timeout_app") + + try: + yield module + finally: + with contextlib.suppress(Exception): + module.queue._inner.request_shutdown() + if prev_db is None: + os.environ.pop("TASKITO_TIMEOUT_TEST_DB", None) + else: + os.environ["TASKITO_TIMEOUT_TEST_DB"] = prev_db + if prev_pythonpath is None: + os.environ.pop("PYTHONPATH", None) + else: + os.environ["PYTHONPATH"] = prev_pythonpath + + +def _start_prefork_worker(queue: Queue) -> threading.Thread: + """Start a prefork worker for ``queue`` in a daemon thread.""" + thread = threading.Thread( + target=queue.run_worker, + kwargs={"pool": "prefork", "app": PREFORK_APP_PATH}, + daemon=True, + ) + thread.start() + return thread + + +def _wait_for_terminal(job: Any, timeout: float) -> str: + """Poll a JobResult.refresh() until the status is terminal or `timeout` elapses.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + job.refresh() + status: str = job.status + if status in {"complete", "failed", "dead", "cancelled"}: + return status + time.sleep(0.1) + job.refresh() + final_status: str = job.status + return final_status + + +def test_prefork_kills_hung_task(timeout_app: object) -> None: + """A task that hangs past its `timeout=` is SIGKILLed by the watchdog and + reported as a timeout failure within the timeout + watchdog tick budget.""" + timeouts_seen: list[str] = [] + + class TimeoutSpy(TaskMiddleware): + def on_timeout(self, ctx: JobContext) -> None: + timeouts_seen.append(ctx.id) + + queue: Queue = timeout_app.queue # type: ignore[attr-defined] + queue._global_middleware.append(TimeoutSpy()) + + started = time.monotonic() + job = timeout_app.hang.delay() # type: ignore[attr-defined] + _start_prefork_worker(queue) + + # timeout=2s, watchdog tick=250ms → kill within ~2.25s; allow generous + # headroom for child spawn and CI noise. + status = _wait_for_terminal(job, timeout=15) + elapsed = time.monotonic() - started + + assert status == "dead", f"expected 'dead', got {status!r} (error={job.error!r})" + assert "timed out" in (job.error or "").lower() + assert elapsed < 12, f"hung task took {elapsed:.1f}s to be killed (expected < 12s)" + assert job.id in timeouts_seen, "on_timeout middleware did not fire" + + +def test_prefork_no_timeout_unaffected(timeout_app: object) -> None: + """A task with no timeout (timeout=0) runs to completion — the watchdog + must not kill jobs that have no deadline configured.""" + queue: Queue = timeout_app.queue # type: ignore[attr-defined] + + job = timeout_app.quick.delay(21) # type: ignore[attr-defined] + _start_prefork_worker(queue) + + result = job.result(timeout=15) + assert result == 42 + + +def test_prefork_finishes_before_deadline(timeout_app: object) -> None: + """A task that completes well before its deadline returns normally — the + watchdog only fires when the deadline is actually crossed.""" + queue: Queue = timeout_app.queue # type: ignore[attr-defined] + + # timeout=2s, sleep 0.5s — should finish cleanly. + job = timeout_app.sleep_then_finish.delay(0.5) # type: ignore[attr-defined] + _start_prefork_worker(queue) + + result = job.result(timeout=15) + assert result == "done" From 8f4ba99600dd9c5d7c67317f72a2d2fe24d898de Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Thu, 30 Apr 2026 14:50:59 +0530 Subject: [PATCH 3/5] test(prefork): skip timeout tests on Windows --- tests/python/test_prefork.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/python/test_prefork.py b/tests/python/test_prefork.py index 47b0a60..8e8e0ff 100644 --- a/tests/python/test_prefork.py +++ b/tests/python/test_prefork.py @@ -66,6 +66,16 @@ def multiply(x: int, y: int) -> int: # Per-job timeout enforcement (issue #81) # --------------------------------------------------------------------------- +# The prefork pool is Unix-oriented: child processes communicate over anonymous +# stdio pipes, which on Windows have different blocking semantics that make +# parent-side reader threads hang after `TerminateProcess`. Per-job timeout +# behaviour itself is identical, but the surrounding pool plumbing isn't +# Windows-ready, so these end-to-end tests are skipped there. +prefork_unix_only = pytest.mark.skipif( + sys.platform == "win32", + reason="prefork pool is Unix-only — child stdio pipe semantics differ on Windows", +) + PREFORK_APP_PATH = "prefork_apps.timeout_app:queue" PREFORK_APP_DIR = str(Path(__file__).parent) @@ -138,6 +148,7 @@ def _wait_for_terminal(job: Any, timeout: float) -> str: return final_status +@prefork_unix_only def test_prefork_kills_hung_task(timeout_app: object) -> None: """A task that hangs past its `timeout=` is SIGKILLed by the watchdog and reported as a timeout failure within the timeout + watchdog tick budget.""" @@ -165,6 +176,7 @@ def on_timeout(self, ctx: JobContext) -> None: assert job.id in timeouts_seen, "on_timeout middleware did not fire" +@prefork_unix_only def test_prefork_no_timeout_unaffected(timeout_app: object) -> None: """A task with no timeout (timeout=0) runs to completion — the watchdog must not kill jobs that have no deadline configured.""" @@ -177,6 +189,7 @@ def test_prefork_no_timeout_unaffected(timeout_app: object) -> None: assert result == 42 +@prefork_unix_only def test_prefork_finishes_before_deadline(timeout_app: object) -> None: """A task that completes well before its deadline returns normally — the watchdog only fires when the deadline is actually crossed.""" From 8697b2a754e577213d3bde4402480de06aaf505c Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Thu, 30 Apr 2026 15:17:21 +0530 Subject: [PATCH 4/5] feat(prefork): reject pool='prefork' on Windows with clear error --- py_src/taskito/app.py | 11 +++++++++-- tests/python/test_prefork.py | 8 ++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/py_src/taskito/app.py b/py_src/taskito/app.py index 2071340..de62304 100644 --- a/py_src/taskito/app.py +++ b/py_src/taskito/app.py @@ -24,6 +24,7 @@ import logging import os import signal +import sys import threading import urllib.parse import uuid @@ -1178,8 +1179,14 @@ def run_worker( app: Import path to the Queue instance (e.g. ``"myapp:queue"``). Required when ``pool="prefork"``. """ - if pool == "prefork" and not app: - raise ValueError("app= is required when pool='prefork' (e.g. app='myapp:queue')") + if pool == "prefork": + if sys.platform == "win32": + raise NotImplementedError( + "pool='prefork' is not supported on Windows. " + "Use pool='thread' (default) or run on Linux/macOS." + ) + if not app: + raise ValueError("app= is required when pool='prefork' (e.g. app='myapp:queue')") queue_list = list(queues) if queues else None # Make queue accessible from job context (for current_job.update_progress()) diff --git a/tests/python/test_prefork.py b/tests/python/test_prefork.py index 8e8e0ff..cd5aa19 100644 --- a/tests/python/test_prefork.py +++ b/tests/python/test_prefork.py @@ -31,6 +31,14 @@ def noop() -> None: queue.run_worker(pool="prefork") +def test_prefork_rejected_on_windows(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + """pool='prefork' on Windows fails fast with NotImplementedError.""" + monkeypatch.setattr(sys, "platform", "win32") + queue = Queue(db_path=str(tmp_path / "test.db")) + with pytest.raises(NotImplementedError, match="not supported on Windows"): + queue.run_worker(pool="prefork", app="x:y") + + def test_prefork_basic_execution(tmp_path: Path) -> None: """A task enqueued and processed by a prefork worker returns the correct result. From 4d48de30e433c588e06bf0b63e6d644ccf2f63d5 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Thu, 30 Apr 2026 16:01:32 +0530 Subject: [PATCH 5/5] test(prefork): skip app-required test on Windows (unreachable) --- tests/python/test_prefork.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/python/test_prefork.py b/tests/python/test_prefork.py index cd5aa19..02897a6 100644 --- a/tests/python/test_prefork.py +++ b/tests/python/test_prefork.py @@ -19,6 +19,10 @@ from taskito.middleware import TaskMiddleware +@pytest.mark.skipif( + sys.platform == "win32", + reason="prefork is rejected on Windows before app= is checked", +) def test_prefork_requires_app_path(tmp_path: Path) -> None: """pool='prefork' without app= raises ValueError.""" queue = Queue(db_path=str(tmp_path / "test.db"))