fix(app): correct enqueue_many middleware args and option propagation#106
Merged
Conversation
`enqueue_many` had three bugs in the on_enqueue dispatch: 1. Always passed `args_list[0]` and `kw_list[0]` to every middleware call, so middleware could not distinguish jobs in the batch. 2. Created a fresh empty options dict per call that was never read back, so any option mutations (priority, queue, delay, ...) were silently discarded. 3. Bare `except: pass` swallowed middleware exceptions with no logging. Restructure the dispatch to mirror single-enqueue: build a per-job options dict initialised from the call-site arguments, run middleware *before* `enqueue_batch` so mutations can take effect, then read the mutated options back into the per-job lists. Replace the silent except with `logger.exception(...)` so misbehaving middleware is observable. To support per-job priority/retries/timeout mutations the Rust `enqueue_batch` signature is widened from `Option<Vec<i32>>` to `Option<Vec<Option<i32>>>` (matching the existing pattern for `delay_seconds_list`, `metadata_list`, etc.); type stub follows. Three regression tests cover the per-job args, the option mutation propagation, and the logged-exception path.
This was referenced May 2, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
P0-3 from the pre-release audit.
enqueue_manyhad three bugs in theon_enqueuemiddleware dispatch that the test suite was missing entirely:args_list[0]andkw_list[0]to every middleware call, so middleware could not distinguish jobs in the batch.optionsdict per call that was never read back, so any option mutations (priority, queue, delay, …) were silently discarded.except: passswallowed middleware exceptions with no logging.The fix restructures the dispatch to match single-enqueue's contract: per-job options dict initialised from the call-site arguments, middleware runs before
enqueue_batchso mutations can take effect, mutated values are read back into the per-job lists.What changed
py_src/taskito/app.py—enqueue_many()rewritten to:optionsdict from the call-site arguments (uniformpriority/queue/etc. broadcast across the batch; per-job lists threaded through).on_enqueuemiddleware against each per-job dict beforeenqueue_batch, replacing the silentexcept: passwithlogger.exception("middleware on_enqueue() error").crates/taskito-python/src/py_queue/mod.rs—enqueue_batchsignature widened fromOption<Vec<i32>>toOption<Vec<Option<i32>>>forpriorities/max_retries_list/timeouts. Matches the existing pattern already used fordelay_seconds_list/metadata_list/expires_list/result_ttl_list. Allows middleware to set per-job priority while letting other jobs fall back to queue defaults.py_src/taskito/_taskito.pyi— type stub follows the Rust signature change.Tests
Three new regression tests in
tests/python/test_batch.py:test_enqueue_many_invokes_on_enqueue_per_job— middleware sees each job's ownargs/kwargs.test_enqueue_many_applies_option_mutations— middleware mutatingoptions["priority"]per-job propagates to the actual jobs.test_enqueue_many_logs_middleware_exceptions— exceptions from misbehaving middleware are logged vialogger.exceptionand don't block the enqueue.Test plan
cargo test --workspace— 89 passcargo clippy --workspace --all-targets -- -D warningscleancargo check --features postgrescleancargo check --features rediscleanuv run python -m pytest tests/python/— 488 pass, 9 skipped (was 485, +3 new)uv run ruff check py_src/ tests/cleanuv run mypy py_src/taskito/clean