Skip to content

feat: composable task predicates with built-in recipes#156

Merged
pratyush618 merged 11 commits into
masterfrom
feat/task-predicates
May 11, 2026
Merged

feat: composable task predicates with built-in recipes#156
pratyush618 merged 11 commits into
masterfrom
feat/task-predicates

Conversation

@pratyush618
Copy link
Copy Markdown
Collaborator

Summary

Adds a composable Predicate system for gating task execution at three layers — enqueue, worker dispatch, and middleware — with a library of predefined recipes (time windows, queue health, feature flags, etc.).

from taskito.predicates import is_business_hours, queue_paused, by_priority_at_least

@queue.task(
    predicate=is_business_hours(tz="US/Pacific")
              & ~queue_paused()
              | by_priority_at_least(8),
    on_false="defer",
)
def send_report(): ...

What's in

  • Core typesPredicate ABC with &/|/~ operator overloading, short-circuit AndPredicate/OrPredicate/NotPredicate, _CallablePredicate adapter for plain callables, coerce_predicate().
  • Outcomes — return True / False / Defer(seconds=N) / Cancel(reason=...) from evaluate(). Async predicates are awaited transparently via run_maybe_async.
  • Fail-closed evaluationevaluate_predicate() swallows exceptions, returns False, records on PredicateMetrics.
  • Built-in recipes in taskito.predicates.recipes/ — time-based (is_business_hours, in_time_window, after, before, is_weekend, in_timezone), job-attribute (by_queue, by_task, by_priority_at_least, retry_count_under, payload_matches), system-state (queue_size_under, queue_paused, error_rate_under), external-config (env_var_truthy, feature_flag with pluggable FeatureFlagProvider).
  • Enqueue-time gating in Queue.enqueue() / enqueue_many()Defer bumps delay, Cancel raises PredicateRejectedError.
  • Worker-dispatch gating in sync _wrap_task and async AsyncTaskExecutor._executeCancel raises TaskCancelledError; Defer re-enqueues a fresh job with delay and cancels the current one.
  • Middleware filteringTaskMiddleware accepts predicate=. legacy_task_filter_to_predicate() keeps contrib's task_filter=Callable[[str], bool] working. Sentry/OTel/Prometheus middlewares updated to forward both.
  • EventsPREDICATE_DEFERRED, PREDICATE_CANCELLED, PREDICATE_REJECTED with phase ("enqueue"/"dispatch") discriminator.
  • Metrics — per-queue PredicateMetrics (allowed / denied / deferred / cancelled / errors).
  • Docs — new docs/content/docs/guides/core/predicates.mdx page.

Layered behavior

Outcome Enqueue time Worker dispatch Middleware
True proceed proceed run hooks
False (on_false="defer") bump delay re-enqueue + cancel current skip hooks
False (on_false="cancel") raise PredicateRejectedError TaskCancelledError skip hooks
Defer(seconds=N) add N to delay re-enqueue with N delay + cancel current skip hooks
Cancel(reason=...) raise PredicateRejectedError TaskCancelledError skip hooks

System-state recipes use a per-evaluation memo (PredicateContext._state_cache) so composed predicates that read queue stats don't hammer storage.

Test plan

  • cargo check --workspace (default, postgres, redis, native-async) — all pass
  • cargo test --workspace — passes
  • uv run python -m pytest tests/596 passed, 9 skipped, including:
    • test_predicates_core.py — 21 tests (operator algebra, short-circuit, fail-closed, async)
    • test_predicates_recipes.py — 32 tests (every built-in recipe + validation)
    • test_predicates_enqueue.py — 14 tests (allow/defer/cancel paths, events, metrics, enqueue_many semantics)
    • test_predicates_worker.py — 7 tests (real workers, sync defer/re-enqueue round-trip, fail-closed at dispatch)
    • test_predicates_middleware.py — 6 tests (legacy task_filter back-compat, predicate gate)
  • uv run ruff check and uv run mypy py_src/ — clean

@pratyush618 pratyush618 merged commit 17e54c5 into master May 11, 2026
21 checks passed
@pratyush618 pratyush618 deleted the feat/task-predicates branch May 11, 2026 07:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant