From 80e32e8c7dd1d6b8bd3f7bbe29bd079988cc62f6 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Thu, 14 May 2026 01:51:02 +0530 Subject: [PATCH 1/6] fix(predicates): remove inline imports in boolean combinators MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AndPredicate / OrPredicate / NotPredicate each imported _resolve_outcome inside the evaluate() method body. The cycle they were defending against doesn't exist at runtime — evaluate.py only imports core under TYPE_CHECKING — so hoist the import to module scope. --- py_src/taskito/predicates/core.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/py_src/taskito/predicates/core.py b/py_src/taskito/predicates/core.py index ec7188f..fad5322 100644 --- a/py_src/taskito/predicates/core.py +++ b/py_src/taskito/predicates/core.py @@ -19,6 +19,7 @@ from collections.abc import Awaitable, Callable from typing import TYPE_CHECKING, Any, ClassVar +from taskito.predicates.evaluate import _resolve_outcome from taskito.predicates.outcomes import Cancel, Defer from taskito.predicates.registry import ( PredicateValidationError, @@ -204,8 +205,6 @@ def __init__(self, left: Predicate, right: Predicate) -> None: self._right = right def evaluate(self, ctx: PredicateContext) -> PredicateReturn: - from taskito.predicates.evaluate import _resolve_outcome - left = _resolve_outcome(self._left, ctx) if isinstance(left, (Defer, Cancel)) or left is False: return left @@ -237,8 +236,6 @@ def __init__(self, left: Predicate, right: Predicate) -> None: self._right = right def evaluate(self, ctx: PredicateContext) -> PredicateReturn: - from taskito.predicates.evaluate import _resolve_outcome - left = _resolve_outcome(self._left, ctx) if left is True: return True @@ -281,8 +278,6 @@ def __init__(self, inner: Predicate) -> None: self._inner = inner def evaluate(self, ctx: PredicateContext) -> PredicateReturn: - from taskito.predicates.evaluate import _resolve_outcome - outcome = _resolve_outcome(self._inner, ctx) if isinstance(outcome, (Defer, Cancel)): return outcome From 3505e8edcd0c4adb68c4f1a792360bcb23eeb283 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Thu, 14 May 2026 01:51:13 +0530 Subject: [PATCH 2/6] test(predicates): add focused unit and integration coverage 30 tests covering the boolean combinators (And/Or/Not short-circuit semantics and Defer/Cancel pass-through), fail-closed evaluation + metric recording, JSON and string-DSL round-trips, recipe behaviour (after/before/in_time_window/payload_matches/env_var_truthy), the callable adapter, custom predicate registration, and three queue integration tests for enqueue-time cancel/defer. --- tests/core/test_predicates.py | 390 ++++++++++++++++++++++++++++++++++ 1 file changed, 390 insertions(+) create mode 100644 tests/core/test_predicates.py diff --git a/tests/core/test_predicates.py b/tests/core/test_predicates.py new file mode 100644 index 0000000..6d29eca --- /dev/null +++ b/tests/core/test_predicates.py @@ -0,0 +1,390 @@ +"""Tests for the predicate AST, recipes, and queue integration. + +Covers: + +* Boolean combinators (And/Or/Not) — short-circuit semantics and + pass-through of Defer / Cancel. +* Serialization round-trips through both JSON (``to_dict``/``from_dict``) + and the string DSL (``parse``/``format_predicate``). +* Fail-closed evaluation: any exception raised in ``evaluate`` returns + ``False`` and bumps the error counter. +* Recipe behaviour for the built-in time / config / system / attribute + predicates. +* Integration with :class:`Queue` — predicates registered via + ``@queue.task(predicate=...)`` correctly defer or cancel at enqueue. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import ClassVar +from unittest.mock import patch + +import pytest + +from taskito import Queue +from taskito.exceptions import PredicateRejectedError +from taskito.predicates import ( + AndPredicate, + Cancel, + Defer, + NotPredicate, + OrPredicate, + Predicate, + PredicateContext, + PredicateMetrics, + PredicateValidationError, + after, + before, + coerce_predicate, + default_registry, + env_var_truthy, + evaluate_predicate, + format_predicate, + in_time_window, + is_business_hours, + is_weekend, + parse, + payload_matches, + queue_paused, + register_predicate, +) +from taskito.predicates.core import _CallablePredicate + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class _Fixed(Predicate): + """Predicate that returns whatever it was constructed with. + + Not registered in the default registry — ``OP=None``. Used to drive + the boolean combinators with known outcomes. + """ + + value: bool | Defer | Cancel = True + OP: ClassVar[str | None] = None + + def evaluate(self, ctx: PredicateContext) -> bool | Defer | Cancel: + return self.value + + +@dataclass(frozen=True) +class _Boom(Predicate): + """Predicate that always raises — exercises fail-closed evaluation.""" + + OP: ClassVar[str | None] = None + + def evaluate(self, ctx: PredicateContext) -> bool: + raise RuntimeError("intentional predicate failure") + + +def _ctx(task_name: str = "tasks.demo", queue: str = "default") -> PredicateContext: + """Minimal context for unit tests that don't need a Queue back-reference.""" + return PredicateContext(task_name=task_name, queue=queue) + + +# --------------------------------------------------------------------------- +# Boolean combinators +# --------------------------------------------------------------------------- + + +def test_and_short_circuits_on_false() -> None: + """AndPredicate must not evaluate right when left returns False.""" + right_ran = [] + + @dataclass(frozen=True) + class _Tracker(Predicate): + OP: ClassVar[str | None] = None + + def evaluate(self, ctx: PredicateContext) -> bool: + right_ran.append(True) + return True + + combined = AndPredicate(_Fixed(False), _Tracker()) + assert combined.evaluate(_ctx()) is False + assert right_ran == [] + + +def test_and_evaluates_right_when_left_is_true() -> None: + combined = AndPredicate(_Fixed(True), _Fixed(False)) + assert combined.evaluate(_ctx()) is False + + +def test_and_propagates_defer_from_left() -> None: + deferred = Defer(seconds=42.0) + combined = AndPredicate(_Fixed(deferred), _Fixed(True)) + assert combined.evaluate(_ctx()) is deferred + + +def test_or_short_circuits_on_true() -> None: + right_ran = [] + + @dataclass(frozen=True) + class _Tracker(Predicate): + OP: ClassVar[str | None] = None + + def evaluate(self, ctx: PredicateContext) -> bool: + right_ran.append(True) + return False + + combined = OrPredicate(_Fixed(True), _Tracker()) + assert combined.evaluate(_ctx()) is True + assert right_ran == [] + + +def test_or_prefers_cancel_over_defer_when_both_deny() -> None: + combined = OrPredicate(_Fixed(Defer(seconds=10.0)), _Fixed(Cancel(reason="nope"))) + outcome = combined.evaluate(_ctx()) + assert isinstance(outcome, Cancel) + assert outcome.reason == "nope" + + +def test_or_returns_false_when_both_plain_false() -> None: + assert OrPredicate(_Fixed(False), _Fixed(False)).evaluate(_ctx()) is False + + +def test_not_inverts_bools_and_passes_through_terminal_outcomes() -> None: + assert NotPredicate(_Fixed(True)).evaluate(_ctx()) is False + assert NotPredicate(_Fixed(False)).evaluate(_ctx()) is True + + defer = Defer(seconds=5.0) + assert NotPredicate(_Fixed(defer)).evaluate(_ctx()) is defer + + cancel = Cancel(reason="halt") + assert NotPredicate(_Fixed(cancel)).evaluate(_ctx()) is cancel + + +def test_operator_overloads_build_expected_ast() -> None: + left = is_weekend() + right = queue_paused() + assert isinstance(left & right, AndPredicate) + assert isinstance(left | right, OrPredicate) + assert isinstance(~left, NotPredicate) + + +# --------------------------------------------------------------------------- +# Fail-closed evaluation + metrics +# --------------------------------------------------------------------------- + + +def test_evaluate_predicate_fail_closed_on_exception() -> None: + metrics = PredicateMetrics() + outcome = evaluate_predicate(_Boom(), _ctx(), metrics=metrics) + assert outcome is False + snapshot = metrics.snapshot() + assert snapshot["errors"] == 1 + # Fail-closed outcome is recorded as "denied", not "allowed". + assert snapshot["denied"] == 1 + assert snapshot["allowed"] == 0 + + +def test_evaluate_predicate_records_each_outcome() -> None: + metrics = PredicateMetrics() + evaluate_predicate(_Fixed(True), _ctx(), metrics=metrics) + evaluate_predicate(_Fixed(False), _ctx(), metrics=metrics) + evaluate_predicate(_Fixed(Defer(seconds=1.0)), _ctx(), metrics=metrics) + evaluate_predicate(_Fixed(Cancel()), _ctx(), metrics=metrics) + snap = metrics.snapshot() + assert snap == {"allowed": 1, "denied": 1, "deferred": 1, "cancelled": 1, "errors": 0} + + +# --------------------------------------------------------------------------- +# Serialization round-trips +# --------------------------------------------------------------------------- + + +def test_json_round_trip_simple_recipe() -> None: + original = is_business_hours(start_hour=10, end_hour=15, tz="UTC") + payload = original.to_dict() + rebuilt = Predicate.from_dict(payload) + assert rebuilt.to_dict() == payload + + +def test_json_round_trip_composed_tree() -> None: + original = is_weekend(tz="UTC") & ~queue_paused() + payload = original.to_dict() + rebuilt = Predicate.from_dict(payload) + # The serialized form is stable, so equality of the dicts is a strong + # check that both the operator nodes and their leaves round-trip. + assert rebuilt.to_dict() == payload + + +def test_string_dsl_round_trip() -> None: + original = is_business_hours(tz="UTC") & ~queue_paused() + rendered = format_predicate(original) + parsed = parse(rendered) + assert parsed.to_dict() == original.to_dict() + + +def test_from_dict_unknown_op_raises() -> None: + with pytest.raises(PredicateValidationError): + Predicate.from_dict({"op": "nonexistent_op_xyz"}) + + +def test_cancel_reason_survives_round_trip_via_callable_adapter() -> None: + cancel = Cancel(reason="quota exhausted") + # Cancel itself is an outcome, not a predicate — round-trip through + # the And combinator instead and verify the leaf survives evaluation. + pred = AndPredicate(_Fixed(True), _Fixed(cancel)) + out = pred.evaluate(_ctx()) + assert isinstance(out, Cancel) + assert out.reason == "quota exhausted" + + +# --------------------------------------------------------------------------- +# Recipe-level behaviour +# --------------------------------------------------------------------------- + + +def test_after_defers_when_target_in_future() -> None: + target = datetime.now(timezone.utc) + timedelta(hours=1) + outcome = after(target).evaluate(_ctx()) + assert isinstance(outcome, Defer) + assert outcome.seconds > 0 + # Sanity: within an hour-ish window allowing for jitter. + assert outcome.seconds <= 3700 + + +def test_after_allows_when_target_in_past() -> None: + past = datetime.now(timezone.utc) - timedelta(seconds=10) + assert after(past).evaluate(_ctx()) is True + + +def test_before_allows_when_target_in_future() -> None: + target = datetime.now(timezone.utc) + timedelta(hours=1) + assert before(target).evaluate(_ctx()) is True + + +def test_before_denies_when_target_in_past() -> None: + past = datetime.now(timezone.utc) - timedelta(seconds=10) + assert before(past).evaluate(_ctx()) is False + + +def test_in_time_window_defers_outside_window() -> None: + # Pick a one-minute window 30 minutes ahead so we are definitely + # outside it regardless of wall-clock time at test runtime. + now = datetime.now(timezone.utc) + start_minutes = (now.hour * 60 + now.minute + 30) % (24 * 60) + end_minutes = (start_minutes + 1) % (24 * 60) + if end_minutes <= start_minutes: + # The chosen window wraps midnight; the recipe rejects such + # configurations. Skip to a non-wrapping pair instead. + start_minutes = 0 + end_minutes = 1 + fmt = lambda m: f"{m // 60:02d}:{m % 60:02d}" # noqa: E731 - tight test helper + outcome = in_time_window(fmt(start_minutes), fmt(end_minutes), tz="UTC").evaluate(_ctx()) + assert isinstance(outcome, Defer) + assert outcome.seconds > 0 + + +def test_payload_matches_kwargs_path() -> None: + pred = payload_matches("kwargs.tenant", "acme") + ctx_hit = PredicateContext(task_name="t", queue="q", kwargs={"tenant": "acme"}) + ctx_miss = PredicateContext(task_name="t", queue="q", kwargs={"tenant": "other"}) + ctx_absent = PredicateContext(task_name="t", queue="q", kwargs={}) + + assert pred.evaluate(ctx_hit) is True + assert pred.evaluate(ctx_miss) is False + assert pred.evaluate(ctx_absent) is False + + +def test_env_var_truthy_reads_environ() -> None: + pred = env_var_truthy("TASKITO_TEST_PREDICATE_FLAG") + with patch.dict(os.environ, {"TASKITO_TEST_PREDICATE_FLAG": "yes"}, clear=False): + assert pred.evaluate(_ctx()) is True + with patch.dict(os.environ, {"TASKITO_TEST_PREDICATE_FLAG": "0"}, clear=False): + assert pred.evaluate(_ctx()) is False + os.environ.pop("TASKITO_TEST_PREDICATE_FLAG", None) + assert pred.evaluate(_ctx()) is False + + +def test_defer_rejects_negative_seconds() -> None: + with pytest.raises(ValueError): + Defer(seconds=-1.0) + + +# --------------------------------------------------------------------------- +# Coercion + callable adapter +# --------------------------------------------------------------------------- + + +def test_coerce_predicate_wraps_callables() -> None: + def gate(ctx: PredicateContext) -> bool: + return ctx.task_name.startswith("ok.") + + coerced = coerce_predicate(gate) + assert isinstance(coerced, _CallablePredicate) + assert coerced.evaluate(_ctx("ok.go")) is True + assert coerced.evaluate(_ctx("no.way")) is False + + +def test_coerce_predicate_passes_through_existing_predicate() -> None: + pred = is_weekend() + assert coerce_predicate(pred) is pred + + +def test_coerce_predicate_none_is_none() -> None: + assert coerce_predicate(None) is None + + +# --------------------------------------------------------------------------- +# Custom registration +# --------------------------------------------------------------------------- + + +def test_register_predicate_round_trips_through_default_registry() -> None: + op_name = "tests_predicates_custom_allow" + + @dataclass(frozen=True) + class CustomAllow(Predicate): + OP: ClassVar[str | None] = op_name + + def evaluate(self, ctx: PredicateContext) -> bool: + return True + + # Class auto-registers via __init_subclass__; the explicit call is a + # no-op duplicate registration but must not raise. + register_predicate(op_name, CustomAllow, replace=True) + rebuilt = Predicate.from_dict({"op": op_name}) + assert isinstance(rebuilt, CustomAllow) + assert op_name in default_registry() + + +# --------------------------------------------------------------------------- +# Queue integration — enqueue-time gating +# --------------------------------------------------------------------------- + + +def test_task_predicate_cancel_raises_at_enqueue(queue: Queue) -> None: + @queue.task(predicate=_Fixed(Cancel(reason="quota"))) + def gated() -> None: # pragma: no cover - never runs + pass + + with pytest.raises(PredicateRejectedError) as excinfo: + gated.delay() + assert "gated" in str(excinfo.value) + + +def test_task_predicate_false_with_cancel_raises_at_enqueue(queue: Queue) -> None: + @queue.task(predicate=_Fixed(False), on_false="cancel") + def hard_off() -> None: # pragma: no cover - never runs + pass + + with pytest.raises(PredicateRejectedError): + hard_off.delay() + + +def test_task_predicate_defer_bumps_delay(queue: Queue) -> None: + @queue.task(predicate=_Fixed(Defer(seconds=5.0))) + def deferred_task() -> None: # pragma: no cover - not dispatched in this test + pass + + # Predicate returning Defer is allowed to enqueue with an additional + # delay; the enqueue call must succeed and return a job. + job = deferred_task.delay() + assert job.id is not None From af9033b386666e8c73e8550c64492729a08b3af3 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Thu, 14 May 2026 01:51:24 +0530 Subject: [PATCH 3/6] refactor(predicates): extract QueuePredicateMixin from app.py The predicate state (six instance dicts plus PredicateMetrics) and the three gating methods (_apply_enqueue_predicate, _apply_dispatch_predicate, _reenqueue_after_defer) plus the inspection / registration API (list_predicates, predicate_for, register_predicate) all moved to a new QueuePredicateMixin. Event-emission boilerplate was lifted into small helpers on the mixin to remove the four repeated _emit_event(...) blocks. app.py drops from 901 to 619 LOC; the mixin owns ~280 LOC of cohesive predicate logic. --- py_src/taskito/app.py | 291 +----------------------- py_src/taskito/mixins/__init__.py | 4 + py_src/taskito/mixins/predicates.py | 336 ++++++++++++++++++++++++++++ 3 files changed, 345 insertions(+), 286 deletions(-) create mode 100644 py_src/taskito/mixins/predicates.py diff --git a/py_src/taskito/app.py b/py_src/taskito/app.py index 1040d12..625377d 100644 --- a/py_src/taskito/app.py +++ b/py_src/taskito/app.py @@ -28,7 +28,6 @@ from taskito._taskito import PyQueue from taskito.async_support.mixins import AsyncQueueMixin from taskito.events import EventBus, EventType -from taskito.exceptions import PredicateRejectedError, TaskCancelledError from taskito.interception import ArgumentInterceptor from taskito.interception.built_in import build_default_registry from taskito.interception.metrics import InterceptionMetrics @@ -40,20 +39,11 @@ QueueLifecycleMixin, QueueLockMixin, QueueOperationsMixin, + QueuePredicateMixin, QueueResourceMixin, + QueueRuntimeConfigMixin, QueueSettingsMixin, ) -from taskito.predicates.context import PredicateContext -from taskito.predicates.core import Predicate as _Predicate -from taskito.predicates.evaluate import evaluate_predicate -from taskito.predicates.metrics import PredicateMetrics -from taskito.predicates.outcomes import Cancel, Defer -from taskito.predicates.registry import ( - PredicateValidationError, -) -from taskito.predicates.registry import ( - register_predicate as _register_predicate, -) from taskito.proxies import ProxyRegistry from taskito.proxies.built_in import register_builtin_handlers from taskito.proxies.metrics import ProxyMetrics @@ -84,6 +74,8 @@ class QueueWorkflowMixin: # type: ignore[no-redef] class Queue( QueueDecoratorMixin, + QueuePredicateMixin, + QueueRuntimeConfigMixin, QueueResourceMixin, QueueEventsMixin, QueueLifecycleMixin, @@ -226,12 +218,7 @@ def __init__( self._global_middleware: list[TaskMiddleware] = middleware or [] self._task_middleware: dict[str, list[TaskMiddleware]] = {} self._task_retry_filters: dict[str, dict[str, list[type[Exception]]]] = {} - self._task_predicates: dict[str, Any] = {} - self._task_predicate_on_false: dict[str, str] = {} - self._task_predicate_extras: dict[str, dict[str, Any]] = {} - self._task_default_defer: dict[str, float] = {} - self._task_predicate_serialized: dict[str, dict[str, Any] | None] = {} - self._predicate_metrics = PredicateMetrics() + self._init_predicate_state() self._drain_timeout = drain_timeout self._queue_configs: dict[str, dict[str, Any]] = {} self._event_bus = EventBus(max_workers=event_workers) @@ -319,274 +306,6 @@ def _deserialize_payload(self, task_name: str, payload: bytes) -> tuple: """Deserialize a job payload using the per-task or queue-level serializer.""" return self._get_serializer(task_name).loads(payload) # type: ignore[no-any-return] - # -- Predicate inspection / registration ----------------------------- - - def list_predicates(self) -> dict[str, dict[str, Any] | None]: - """Return the serialized predicate (or ``None`` for bare callables) - registered for every task that has one. - - The values are JSON-safe dicts produced by - :meth:`Predicate.to_dict`. Consumers (dashboard, audit logs) can - feed each value back through :meth:`Predicate.from_dict` to - rebuild the AST. - """ - return dict(self._task_predicate_serialized) - - def predicate_for(self, task_name: str) -> dict[str, Any] | None: - """Return the serialized predicate for ``task_name`` or ``None``.""" - return self._task_predicate_serialized.get(task_name) - - def register_predicate(self, op: str, *, replace: bool = False) -> Callable[[type], type]: - """Class decorator: register a custom :class:`Predicate` subclass. - - Example:: - - from taskito.predicates import Predicate - - @queue.register_predicate("tenant_quota_under") - class TenantQuotaUnder(Predicate): - OP = "tenant_quota_under" - ... - - The ``OP`` set on the class must match ``op``. Once registered, - the predicate participates in JSON serialization and the string - DSL just like a built-in recipe. - """ - - def decorator(cls: type) -> type: - if not isinstance(cls, type) or not issubclass(cls, _Predicate): - raise PredicateValidationError( - f"register_predicate target must subclass Predicate; got {cls!r}" - ) - declared = cls.__dict__.get("OP") - if declared and declared != op: - raise PredicateValidationError( - f"OP mismatch: decorator says {op!r}, class declares {declared!r}" - ) - cls.OP = op - _register_predicate(op, cls, replace=replace) - return cls - - return decorator - - def _apply_dispatch_predicate( - self, - *, - task_name: str, - args: tuple, - kwargs: dict, - job_id: str, - queue_name: str, - priority: int = 0, - retry_count: int = 0, - ) -> None: - """Evaluate the worker-dispatch predicate; defer or cancel as needed. - - ``Defer`` (or ``False`` with ``on_false="defer"``) re-enqueues a - fresh job with the same payload and a delay, then raises - :class:`TaskCancelledError` so the current execution is marked - cancelled by the Rust runner. ``Cancel`` (or ``False`` with - ``on_false="cancel"``) raises :class:`TaskCancelledError` directly - without re-enqueueing. - - Returns silently when the predicate allows. - """ - predicate = self._task_predicates.get(task_name) - if predicate is None: - return - - ctx = PredicateContext.for_dispatch( - task_name=task_name, - queue=queue_name, - priority=priority, - retry_count=retry_count, - args=tuple(args), - kwargs=dict(kwargs), - job_id=job_id, - payload_size=0, - extras=self._task_predicate_extras.get(task_name), - queue_ref=self, - ) - outcome = evaluate_predicate(predicate, ctx, metrics=self._predicate_metrics) - - if outcome is True: - return - - if isinstance(outcome, Cancel): - self._emit_event( - EventType.PREDICATE_CANCELLED, - { - "task_name": task_name, - "job_id": job_id, - "queue": queue_name, - "reason": outcome.reason, - "phase": "dispatch", - }, - ) - raise TaskCancelledError( - f"predicate cancelled job {job_id}: {outcome.reason}" - if outcome.reason - else f"predicate cancelled job {job_id}" - ) - - if isinstance(outcome, Defer): - self._reenqueue_after_defer( - task_name=task_name, - args=args, - kwargs=kwargs, - queue_name=queue_name, - delay_seconds=outcome.seconds, - ) - self._emit_event( - EventType.PREDICATE_DEFERRED, - { - "task_name": task_name, - "job_id": job_id, - "queue": queue_name, - "defer_seconds": outcome.seconds, - "phase": "dispatch", - }, - ) - raise TaskCancelledError(f"predicate deferred job {job_id} by {outcome.seconds:.1f}s") - - # Plain False — branch on on_false. - action = self._task_predicate_on_false.get(task_name, "defer") - if action == "cancel": - self._emit_event( - EventType.PREDICATE_CANCELLED, - { - "task_name": task_name, - "job_id": job_id, - "queue": queue_name, - "phase": "dispatch", - }, - ) - raise TaskCancelledError(f"predicate rejected job {job_id}") - - defer_seconds = self._task_default_defer.get(task_name, 60.0) - self._reenqueue_after_defer( - task_name=task_name, - args=args, - kwargs=kwargs, - queue_name=queue_name, - delay_seconds=defer_seconds, - ) - self._emit_event( - EventType.PREDICATE_DEFERRED, - { - "task_name": task_name, - "job_id": job_id, - "queue": queue_name, - "defer_seconds": defer_seconds, - "phase": "dispatch", - }, - ) - raise TaskCancelledError(f"predicate deferred job {job_id}") - - def _reenqueue_after_defer( - self, - *, - task_name: str, - args: tuple, - kwargs: dict, - queue_name: str, - delay_seconds: float, - ) -> None: - """Re-enqueue a job with a delay, bypassing predicate re-evaluation. - - Args/kwargs are serialized fresh via the task's serializer. We go - straight to the Rust queue to avoid running enqueue-time - middleware or re-evaluating the predicate (which would create an - infinite ping-pong). - """ - serializer = self._get_serializer(task_name) - payload = serializer.dumps((tuple(args), dict(kwargs))) - self._inner.enqueue( - task_name=task_name, - payload=payload, - queue=queue_name, - delay_seconds=delay_seconds, - ) - - def _apply_enqueue_predicate( - self, - *, - predicate: Any, - task_name: str, - queue_name: str, - priority: int | None, - args: tuple, - kwargs: dict, - payload_size: int, - delay: float | None, - ) -> float | None: - """Evaluate an enqueue-time predicate; return adjusted ``delay``. - - Raises :class:`~taskito.exceptions.PredicateRejectedError` when the - outcome is a :class:`~taskito.predicates.Cancel`, or a plain - ``False`` paired with ``on_false="cancel"``. Returns the (possibly - bumped) ``delay`` for the caller to pass through to the Rust - ``enqueue``. - """ - ctx = PredicateContext.for_enqueue( - task_name=task_name, - queue=queue_name, - priority=priority, - args=tuple(args), - kwargs=dict(kwargs), - payload_size=payload_size, - delay_seconds=delay, - extras=self._task_predicate_extras.get(task_name), - queue_ref=self, - ) - outcome = evaluate_predicate(predicate, ctx, metrics=self._predicate_metrics) - - if outcome is True: - return delay - if isinstance(outcome, Defer): - self._emit_event( - EventType.PREDICATE_DEFERRED, - { - "task_name": task_name, - "queue": queue_name, - "defer_seconds": outcome.seconds, - "phase": "enqueue", - }, - ) - return (delay or 0.0) + outcome.seconds - if isinstance(outcome, Cancel): - self._emit_event( - EventType.PREDICATE_REJECTED, - { - "task_name": task_name, - "queue": queue_name, - "reason": outcome.reason, - "phase": "enqueue", - }, - ) - raise PredicateRejectedError(task_name, outcome.reason) - - # Plain False — branch on the task's on_false setting. - action = self._task_predicate_on_false.get(task_name, "defer") - if action == "cancel": - self._emit_event( - EventType.PREDICATE_REJECTED, - {"task_name": task_name, "queue": queue_name, "phase": "enqueue"}, - ) - raise PredicateRejectedError(task_name) - # defer - defer_seconds = self._task_default_defer.get(task_name, 60.0) - self._emit_event( - EventType.PREDICATE_DEFERRED, - { - "task_name": task_name, - "queue": queue_name, - "defer_seconds": defer_seconds, - "phase": "enqueue", - }, - ) - return (delay or 0.0) + defer_seconds - def enqueue( self, task_name: str, diff --git a/py_src/taskito/mixins/__init__.py b/py_src/taskito/mixins/__init__.py index cad14a0..f9b07ba 100644 --- a/py_src/taskito/mixins/__init__.py +++ b/py_src/taskito/mixins/__init__.py @@ -6,7 +6,9 @@ from taskito.mixins.lifecycle import QueueLifecycleMixin from taskito.mixins.locks import QueueLockMixin from taskito.mixins.operations import QueueOperationsMixin +from taskito.mixins.predicates import QueuePredicateMixin from taskito.mixins.resources import QueueResourceMixin +from taskito.mixins.runtime_config import QueueRuntimeConfigMixin from taskito.mixins.settings import QueueSettingsMixin __all__ = [ @@ -16,6 +18,8 @@ "QueueLifecycleMixin", "QueueLockMixin", "QueueOperationsMixin", + "QueuePredicateMixin", "QueueResourceMixin", + "QueueRuntimeConfigMixin", "QueueSettingsMixin", ] diff --git a/py_src/taskito/mixins/predicates.py b/py_src/taskito/mixins/predicates.py new file mode 100644 index 0000000..193162c --- /dev/null +++ b/py_src/taskito/mixins/predicates.py @@ -0,0 +1,336 @@ +"""Predicate registration, inspection, and gating (enqueue + dispatch). + +All predicate state and behaviour lives here. The Queue composes this +mixin alongside :class:`~taskito.mixins.decorators.QueueDecoratorMixin`, +which handles wiring predicates into ``@queue.task`` and triggering the +worker-side dispatch gate during ``_wrap_task``. + +The two main flows are: + +* **Enqueue gate** — :meth:`_apply_enqueue_predicate` runs before the + Rust ``enqueue`` call (from ``Queue.enqueue`` / ``Queue.enqueue_many``). + It returns a (possibly bumped) ``delay`` and raises + :class:`~taskito.exceptions.PredicateRejectedError` when the predicate + cancels the enqueue. +* **Dispatch gate** — :meth:`_apply_dispatch_predicate` runs on the + worker, called from ``QueueDecoratorMixin._wrap_task`` before the + task body executes. ``Defer`` re-enqueues a fresh job via + :meth:`_reenqueue_after_defer` and raises + :class:`~taskito.exceptions.TaskCancelledError`. ``Cancel`` raises + the same exception without re-enqueueing. + +Both gates emit the appropriate ``PREDICATE_*`` events through +``self._emit_event`` and record outcomes on ``self._predicate_metrics``. +""" + +from __future__ import annotations + +from collections.abc import Callable +from typing import TYPE_CHECKING, Any + +from taskito.events import EventType +from taskito.exceptions import PredicateRejectedError, TaskCancelledError +from taskito.predicates.context import PredicateContext +from taskito.predicates.core import Predicate as _Predicate +from taskito.predicates.evaluate import evaluate_predicate +from taskito.predicates.metrics import PredicateMetrics +from taskito.predicates.outcomes import Cancel, Defer +from taskito.predicates.registry import PredicateValidationError +from taskito.predicates.registry import ( + register_predicate as _register_predicate, +) + +if TYPE_CHECKING: + from taskito._taskito import PyQueue + from taskito.serializers import Serializer + + +class QueuePredicateMixin: + """Public predicate API + internal enqueue/dispatch gating.""" + + _inner: PyQueue + _task_predicates: dict[str, _Predicate] + _task_predicate_on_false: dict[str, str] + _task_predicate_extras: dict[str, dict[str, Any]] + _task_default_defer: dict[str, float] + _task_predicate_serialized: dict[str, dict[str, Any] | None] + _predicate_metrics: PredicateMetrics + + # Supplied by other mixins on the composed Queue. + _emit_event: Callable[..., None] + _get_serializer: Callable[[str], Serializer] + + def _init_predicate_state(self) -> None: + """Initialise predicate-related instance state. + + Invoked once from ``Queue.__init__``. Kept on the mixin so the + full set of predicate fields stays in one place. + """ + self._task_predicates = {} + self._task_predicate_on_false = {} + self._task_predicate_extras = {} + self._task_default_defer = {} + self._task_predicate_serialized = {} + self._predicate_metrics = PredicateMetrics() + + # -- Inspection / registration --------------------------------------- + + def list_predicates(self) -> dict[str, dict[str, Any] | None]: + """Return the serialized predicate (or ``None`` for bare callables) + registered for every task that has one. + + The values are JSON-safe dicts produced by + :meth:`Predicate.to_dict`. Consumers (dashboard, audit logs) can + feed each value back through :meth:`Predicate.from_dict` to + rebuild the AST. + """ + return dict(self._task_predicate_serialized) + + def predicate_for(self, task_name: str) -> dict[str, Any] | None: + """Return the serialized predicate for ``task_name`` or ``None``.""" + return self._task_predicate_serialized.get(task_name) + + def register_predicate(self, op: str, *, replace: bool = False) -> Callable[[type], type]: + """Class decorator: register a custom :class:`Predicate` subclass. + + Example:: + + from taskito.predicates import Predicate + + @queue.register_predicate("tenant_quota_under") + class TenantQuotaUnder(Predicate): + OP = "tenant_quota_under" + ... + + The ``OP`` set on the class must match ``op``. Once registered, + the predicate participates in JSON serialization and the string + DSL just like a built-in recipe. + """ + + def decorator(cls: type) -> type: + if not isinstance(cls, type) or not issubclass(cls, _Predicate): + raise PredicateValidationError( + f"register_predicate target must subclass Predicate; got {cls!r}" + ) + declared = cls.__dict__.get("OP") + if declared and declared != op: + raise PredicateValidationError( + f"OP mismatch: decorator says {op!r}, class declares {declared!r}" + ) + cls.OP = op + _register_predicate(op, cls, replace=replace) + return cls + + return decorator + + # -- Dispatch-time gate ---------------------------------------------- + + def _apply_dispatch_predicate( + self, + *, + task_name: str, + args: tuple, + kwargs: dict, + job_id: str, + queue_name: str, + priority: int = 0, + retry_count: int = 0, + ) -> None: + """Evaluate the worker-dispatch predicate; defer or cancel as needed. + + ``Defer`` (or ``False`` with ``on_false="defer"``) re-enqueues a + fresh job with the same payload and a delay, then raises + :class:`TaskCancelledError` so the current execution is marked + cancelled by the Rust runner. ``Cancel`` (or ``False`` with + ``on_false="cancel"``) raises :class:`TaskCancelledError` directly + without re-enqueueing. + + Returns silently when the predicate allows. + """ + predicate = self._task_predicates.get(task_name) + if predicate is None: + return + + ctx = PredicateContext.for_dispatch( + task_name=task_name, + queue=queue_name, + priority=priority, + retry_count=retry_count, + args=tuple(args), + kwargs=dict(kwargs), + job_id=job_id, + payload_size=0, + extras=self._task_predicate_extras.get(task_name), + queue_ref=self, # type: ignore[arg-type] + ) + outcome = evaluate_predicate(predicate, ctx, metrics=self._predicate_metrics) + + if outcome is True: + return + + if isinstance(outcome, Cancel): + self._emit_dispatch_cancel(task_name, job_id, queue_name, outcome.reason) + raise TaskCancelledError(_dispatch_cancel_message(job_id, outcome.reason)) + + if isinstance(outcome, Defer): + self._reenqueue_after_defer( + task_name=task_name, + args=args, + kwargs=kwargs, + queue_name=queue_name, + delay_seconds=outcome.seconds, + ) + self._emit_dispatch_defer(task_name, job_id, queue_name, outcome.seconds) + raise TaskCancelledError(f"predicate deferred job {job_id} by {outcome.seconds:.1f}s") + + # Plain False — branch on the task's on_false setting. + action = self._task_predicate_on_false.get(task_name, "defer") + if action == "cancel": + self._emit_dispatch_cancel(task_name, job_id, queue_name, None) + raise TaskCancelledError(f"predicate rejected job {job_id}") + + defer_seconds = self._task_default_defer.get(task_name, 60.0) + self._reenqueue_after_defer( + task_name=task_name, + args=args, + kwargs=kwargs, + queue_name=queue_name, + delay_seconds=defer_seconds, + ) + self._emit_dispatch_defer(task_name, job_id, queue_name, defer_seconds) + raise TaskCancelledError(f"predicate deferred job {job_id}") + + def _reenqueue_after_defer( + self, + *, + task_name: str, + args: tuple, + kwargs: dict, + queue_name: str, + delay_seconds: float, + ) -> None: + """Re-enqueue a job with a delay, bypassing predicate re-evaluation. + + Args/kwargs are serialized fresh via the task's serializer. We go + straight to the Rust queue to avoid running enqueue-time + middleware or re-evaluating the predicate (which would create an + infinite ping-pong). + """ + serializer = self._get_serializer(task_name) + payload = serializer.dumps((tuple(args), dict(kwargs))) + self._inner.enqueue( + task_name=task_name, + payload=payload, + queue=queue_name, + delay_seconds=delay_seconds, + ) + + # -- Enqueue-time gate ----------------------------------------------- + + def _apply_enqueue_predicate( + self, + *, + predicate: Any, + task_name: str, + queue_name: str, + priority: int | None, + args: tuple, + kwargs: dict, + payload_size: int, + delay: float | None, + ) -> float | None: + """Evaluate an enqueue-time predicate; return adjusted ``delay``. + + Raises :class:`~taskito.exceptions.PredicateRejectedError` when the + outcome is a :class:`~taskito.predicates.Cancel`, or a plain + ``False`` paired with ``on_false="cancel"``. Returns the (possibly + bumped) ``delay`` for the caller to pass through to the Rust + ``enqueue``. + """ + ctx = PredicateContext.for_enqueue( + task_name=task_name, + queue=queue_name, + priority=priority, + args=tuple(args), + kwargs=dict(kwargs), + payload_size=payload_size, + delay_seconds=delay, + extras=self._task_predicate_extras.get(task_name), + queue_ref=self, # type: ignore[arg-type] + ) + outcome = evaluate_predicate(predicate, ctx, metrics=self._predicate_metrics) + + if outcome is True: + return delay + if isinstance(outcome, Defer): + self._emit_enqueue_defer(task_name, queue_name, outcome.seconds) + return (delay or 0.0) + outcome.seconds + if isinstance(outcome, Cancel): + self._emit_enqueue_reject(task_name, queue_name, outcome.reason) + raise PredicateRejectedError(task_name, outcome.reason) + + # Plain False — branch on the task's on_false setting. + action = self._task_predicate_on_false.get(task_name, "defer") + if action == "cancel": + self._emit_enqueue_reject(task_name, queue_name, None) + raise PredicateRejectedError(task_name) + + defer_seconds = self._task_default_defer.get(task_name, 60.0) + self._emit_enqueue_defer(task_name, queue_name, defer_seconds) + return (delay or 0.0) + defer_seconds + + # -- Event emission helpers ------------------------------------------ + + def _emit_dispatch_cancel( + self, task_name: str, job_id: str, queue_name: str, reason: str | None + ) -> None: + payload: dict[str, Any] = { + "task_name": task_name, + "job_id": job_id, + "queue": queue_name, + "phase": "dispatch", + } + if reason: + payload["reason"] = reason + self._emit_event(EventType.PREDICATE_CANCELLED, payload) + + def _emit_dispatch_defer( + self, task_name: str, job_id: str, queue_name: str, defer_seconds: float + ) -> None: + self._emit_event( + EventType.PREDICATE_DEFERRED, + { + "task_name": task_name, + "job_id": job_id, + "queue": queue_name, + "defer_seconds": defer_seconds, + "phase": "dispatch", + }, + ) + + def _emit_enqueue_defer(self, task_name: str, queue_name: str, defer_seconds: float) -> None: + self._emit_event( + EventType.PREDICATE_DEFERRED, + { + "task_name": task_name, + "queue": queue_name, + "defer_seconds": defer_seconds, + "phase": "enqueue", + }, + ) + + def _emit_enqueue_reject(self, task_name: str, queue_name: str, reason: str | None) -> None: + payload: dict[str, Any] = { + "task_name": task_name, + "queue": queue_name, + "phase": "enqueue", + } + if reason: + payload["reason"] = reason + self._emit_event(EventType.PREDICATE_REJECTED, payload) + + +def _dispatch_cancel_message(job_id: str, reason: str) -> str: + if reason: + return f"predicate cancelled job {job_id}: {reason}" + return f"predicate cancelled job {job_id}" From f9551970ebe1a4b3f1ca5730ef962ede353dc855 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Thu, 14 May 2026 01:51:41 +0530 Subject: [PATCH 4/6] refactor(mixins): split QueueRuntimeConfigMixin from decorators register_type, set_queue_rate_limit, and set_queue_concurrency are runtime configuration knobs, not decorator surface. Moving them to a dedicated QueueRuntimeConfigMixin pulls 64 LOC out of decorators.py (597 -> 533) and gives the three queue-tuning APIs a coherent home adjacent to the existing QueueSettingsMixin (dashboard key/value store). --- py_src/taskito/mixins/decorators.py | 64 ------------------ py_src/taskito/mixins/runtime_config.py | 87 +++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 64 deletions(-) create mode 100644 py_src/taskito/mixins/runtime_config.py diff --git a/py_src/taskito/mixins/decorators.py b/py_src/taskito/mixins/decorators.py index cca936d..7955df0 100644 --- a/py_src/taskito/mixins/decorators.py +++ b/py_src/taskito/mixins/decorators.py @@ -18,7 +18,6 @@ from taskito.events import EventType from taskito.inject import Inject, _InjectAlias from taskito.interception.reconstruct import reconstruct_args -from taskito.interception.strategy import Strategy as S from taskito.predicates.core import coerce_predicate from taskito.proxies import cleanup_proxies, reconstruct_proxies from taskito.task import TaskWrapper @@ -531,66 +530,3 @@ def on_failure(self, fn: Callable) -> Callable: """ self._hooks["on_failure"].append(fn) return fn - - # -- Type registration -- - - def register_type( - self, - python_type: type, - strategy: str, - *, - resource: str | None = None, - message: str | None = None, - converter: Callable | None = None, - type_key: str | None = None, - proxy_handler: str | None = None, - ) -> None: - """Register a custom type with the interception system. - - Args: - python_type: The type to register. - strategy: One of ``"pass"``, ``"convert"``, ``"redirect"``, - ``"reject"``, or ``"proxy"``. - resource: Resource name for ``"redirect"`` strategy. - message: Rejection reason for ``"reject"`` strategy. - converter: Converter callable for ``"convert"`` strategy. - type_key: Key for the converter reconstructor dispatch. - proxy_handler: Handler name for ``"proxy"`` strategy. - """ - if self._interceptor is None: - raise RuntimeError( - "Interception is disabled; set interception='strict' or " - "'lenient' to use register_type()" - ) - strat = S(strategy) - self._interceptor._registry.register( - python_type, - strat, - priority=15, - redirect_resource=resource, - reject_reason=message, - converter=converter, - type_key=type_key, - proxy_handler=proxy_handler, - ) - - # -- Queue-level config -- - - def set_queue_rate_limit(self, queue_name: str, rate_limit: str) -> None: - """Set a rate limit for an entire queue. - - Args: - queue_name: Queue name (e.g. ``"default"``). - rate_limit: Rate limit string, e.g. ``"100/m"``, ``"10/s"``. - """ - self._queue_configs.setdefault(queue_name, {})["rate_limit"] = rate_limit - - def set_queue_concurrency(self, queue_name: str, max_concurrent: int) -> None: - """Set a maximum number of concurrent jobs for a queue. - - Args: - queue_name: Queue name (e.g. ``"default"``). - max_concurrent: Maximum number of jobs running simultaneously - from this queue. - """ - self._queue_configs.setdefault(queue_name, {})["max_concurrent"] = max_concurrent diff --git a/py_src/taskito/mixins/runtime_config.py b/py_src/taskito/mixins/runtime_config.py new file mode 100644 index 0000000..38d4dad --- /dev/null +++ b/py_src/taskito/mixins/runtime_config.py @@ -0,0 +1,87 @@ +"""Queue-level runtime configuration: rate limits, concurrency, type registration. + +These are runtime knobs callers tune after queue construction — distinct +from the decorator-level ``@queue.task(rate_limit=...)`` (which configures +one task) and from the storage-backed dashboard settings managed by +:class:`~taskito.mixins.settings.QueueSettingsMixin`. + +Type registration lives here too because, conceptually, it is a runtime +adjustment to the argument-interception subsystem — not part of the task +decorator itself. +""" + +from __future__ import annotations + +from collections.abc import Callable +from typing import TYPE_CHECKING, Any + +from taskito.interception.strategy import Strategy as S + +if TYPE_CHECKING: + from taskito.interception import ArgumentInterceptor + + +class QueueRuntimeConfigMixin: + """Rate limits, concurrency caps, and custom type registration.""" + + _queue_configs: dict[str, dict[str, Any]] + _interceptor: ArgumentInterceptor | None + + def register_type( + self, + python_type: type, + strategy: str, + *, + resource: str | None = None, + message: str | None = None, + converter: Callable | None = None, + type_key: str | None = None, + proxy_handler: str | None = None, + ) -> None: + """Register a custom type with the interception system. + + Args: + python_type: The type to register. + strategy: One of ``"pass"``, ``"convert"``, ``"redirect"``, + ``"reject"``, or ``"proxy"``. + resource: Resource name for ``"redirect"`` strategy. + message: Rejection reason for ``"reject"`` strategy. + converter: Converter callable for ``"convert"`` strategy. + type_key: Key for the converter reconstructor dispatch. + proxy_handler: Handler name for ``"proxy"`` strategy. + """ + if self._interceptor is None: + raise RuntimeError( + "Interception is disabled; set interception='strict' or " + "'lenient' to use register_type()" + ) + strat = S(strategy) + self._interceptor._registry.register( + python_type, + strat, + priority=15, + redirect_resource=resource, + reject_reason=message, + converter=converter, + type_key=type_key, + proxy_handler=proxy_handler, + ) + + def set_queue_rate_limit(self, queue_name: str, rate_limit: str) -> None: + """Set a rate limit for an entire queue. + + Args: + queue_name: Queue name (e.g. ``"default"``). + rate_limit: Rate limit string, e.g. ``"100/m"``, ``"10/s"``. + """ + self._queue_configs.setdefault(queue_name, {})["rate_limit"] = rate_limit + + def set_queue_concurrency(self, queue_name: str, max_concurrent: int) -> None: + """Set a maximum number of concurrent jobs for a queue. + + Args: + queue_name: Queue name (e.g. ``"default"``). + max_concurrent: Maximum number of jobs running simultaneously + from this queue. + """ + self._queue_configs.setdefault(queue_name, {})["max_concurrent"] = max_concurrent From 7be3875f0dca85ffd3faa46df24353df5bc9a9ce Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Thu, 14 May 2026 01:51:55 +0530 Subject: [PATCH 5/6] refactor(storage): share migration DDL via diesel_common Pull ~750 LOC of duplicated CREATE TABLE / CREATE INDEX / ALTER TABLE DDL out of sqlite/mod.rs and postgres/mod.rs into a new shared module crates/taskito-core/src/storage/diesel_common/migrations.rs. A Dialect struct holds the per-backend type substitutions (BLOB vs BYTEA, INTEGER vs BIGINT, REAL vs DOUBLE PRECISION, boolean defaults, and the IF NOT EXISTS prefix on ALTER). Each backend now exposes a short driver that walks create_tables / create_indexes / alter_statements; per-backend migration_alter() retains the dialect- specific idempotency behaviour (SQLite swallows 'duplicate column', Postgres uses IF NOT EXISTS). sqlite/mod.rs: 502 -> 126 LOC; postgres/mod.rs: 508 -> 130 LOC. All cargo check / clippy / test combinations green; full Python suite still passes (592 / 9 skipped). --- .../src/storage/diesel_common/migrations.rs | 342 +++++++++++++++ .../src/storage/diesel_common/mod.rs | 1 + .../taskito-core/src/storage/postgres/mod.rs | 397 +----------------- crates/taskito-core/src/storage/sqlite/mod.rs | 393 +---------------- 4 files changed, 362 insertions(+), 771 deletions(-) create mode 100644 crates/taskito-core/src/storage/diesel_common/migrations.rs diff --git a/crates/taskito-core/src/storage/diesel_common/migrations.rs b/crates/taskito-core/src/storage/diesel_common/migrations.rs new file mode 100644 index 0000000..5d95f5b --- /dev/null +++ b/crates/taskito-core/src/storage/diesel_common/migrations.rs @@ -0,0 +1,342 @@ +//! Shared schema-migration DDL for the SQLite and Postgres backends. +//! +//! Both backends create the same logical schema; only column types and the +//! `ALTER TABLE ... ADD COLUMN` syntax differ. This module renders the DDL +//! once and substitutes per-backend type names through a [`Dialect`] table. +//! +//! Each backend's `run_migrations()` walks the three lists in order: +//! +//! 1. [`create_tables`] — `CREATE TABLE IF NOT EXISTS …` +//! 2. [`create_indexes`] — index DDL that is identical across both backends +//! 3. [`alter_statements`] — backfill `ALTER TABLE … ADD COLUMN …` for +//! databases that pre-date a column. The backend supplies its own +//! executor: SQLite swallows "duplicate column" errors, while Postgres +//! relies on `IF NOT EXISTS` and surfaces other failures as warnings. + +/// Per-backend type-name substitutions used to render shared DDL templates. +pub struct Dialect { + /// Binary blob column type. SQLite: `BLOB`. Postgres: `BYTEA`. + pub binary: &'static str, + /// 64-bit integer column type. SQLite: `INTEGER`. Postgres: `BIGINT`. + pub big_int: &'static str, + /// Boolean column rendering with a default-true clause. SQLite stores + /// booleans as `INTEGER`; Postgres uses `BOOLEAN`. + pub boolean_default_true: &'static str, + /// Boolean column rendering with a default-false clause. + pub boolean_default_false: &'static str, + /// Floating-point column type. SQLite: `REAL`. Postgres: `DOUBLE PRECISION`. + pub real: &'static str, + /// Prefix inserted between `ADD COLUMN` and the column name on + /// `ALTER TABLE` migrations. Empty for SQLite (which lacks the syntax); + /// `"IF NOT EXISTS "` for Postgres. + pub alter_if_not_exists: &'static str, +} + +// Each Dialect is only referenced from its own backend module, so only +// one of these constants is reachable under any given cargo-feature set. +// The unused-constant warning is silenced rather than wrapping each +// constant in cargo-feature cfgs because the type itself is identical +// across backends. +#[allow(dead_code)] +pub const SQLITE: Dialect = Dialect { + binary: "BLOB", + big_int: "INTEGER", + boolean_default_true: "INTEGER NOT NULL DEFAULT 1", + boolean_default_false: "INTEGER NOT NULL DEFAULT 0", + real: "REAL", + alter_if_not_exists: "", +}; + +#[allow(dead_code)] +pub const POSTGRES: Dialect = Dialect { + binary: "BYTEA", + big_int: "BIGINT", + boolean_default_true: "BOOLEAN NOT NULL DEFAULT TRUE", + boolean_default_false: "BOOLEAN NOT NULL DEFAULT FALSE", + real: "DOUBLE PRECISION", + alter_if_not_exists: "IF NOT EXISTS ", +}; + +/// `CREATE TABLE IF NOT EXISTS` statements in dependency-safe creation order. +/// +/// Returns owned strings — the dialect substitutes type names at call time. +pub fn create_tables(d: &Dialect) -> Vec { + let bin = d.binary; + let bi = d.big_int; + let real = d.real; + let bool_true = d.boolean_default_true; + let bool_false = d.boolean_default_false; + + vec![ + format!( + "CREATE TABLE IF NOT EXISTS jobs ( + id TEXT PRIMARY KEY, + queue TEXT NOT NULL DEFAULT 'default', + task_name TEXT NOT NULL, + payload {bin} NOT NULL, + status INTEGER NOT NULL DEFAULT 0, + priority INTEGER NOT NULL DEFAULT 0, + created_at {bi} NOT NULL, + scheduled_at {bi} NOT NULL, + started_at {bi}, + completed_at {bi}, + retry_count INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL DEFAULT 3, + result {bin}, + error TEXT, + timeout_ms {bi} NOT NULL DEFAULT 300000, + unique_key TEXT, + progress INTEGER, + metadata TEXT, + cancel_requested INTEGER NOT NULL DEFAULT 0, + expires_at {bi}, + result_ttl_ms {bi} + )" + ), + format!( + "CREATE TABLE IF NOT EXISTS dead_letter ( + id TEXT PRIMARY KEY, + original_job_id TEXT NOT NULL, + queue TEXT NOT NULL, + task_name TEXT NOT NULL, + payload {bin} NOT NULL, + error TEXT, + retry_count INTEGER NOT NULL, + failed_at {bi} NOT NULL, + metadata TEXT, + priority INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL DEFAULT 3, + timeout_ms {bi} NOT NULL DEFAULT 300000, + result_ttl_ms {bi} + )" + ), + format!( + "CREATE TABLE IF NOT EXISTS rate_limits ( + key TEXT PRIMARY KEY, + tokens {real} NOT NULL, + max_tokens {real} NOT NULL, + refill_rate {real} NOT NULL, + last_refill {bi} NOT NULL + )" + ), + format!( + "CREATE TABLE IF NOT EXISTS periodic_tasks ( + name TEXT PRIMARY KEY, + task_name TEXT NOT NULL, + cron_expr TEXT NOT NULL, + args {bin}, + kwargs {bin}, + queue TEXT NOT NULL DEFAULT 'default', + enabled {bool_true}, + last_run {bi}, + next_run {bi} NOT NULL + )" + ), + format!( + "CREATE TABLE IF NOT EXISTS job_errors ( + id TEXT PRIMARY KEY, + job_id TEXT NOT NULL, + attempt INTEGER NOT NULL, + error TEXT NOT NULL, + failed_at {bi} NOT NULL + )" + ), + "CREATE TABLE IF NOT EXISTS job_dependencies ( + id TEXT PRIMARY KEY, + job_id TEXT NOT NULL, + depends_on_job_id TEXT NOT NULL + )" + .to_string(), + format!( + "CREATE TABLE IF NOT EXISTS task_metrics ( + id TEXT PRIMARY KEY, + task_name TEXT NOT NULL, + job_id TEXT NOT NULL, + wall_time_ns {bi} NOT NULL, + memory_bytes {bi} NOT NULL DEFAULT 0, + succeeded {bool_true}, + recorded_at {bi} NOT NULL + )" + ), + format!( + "CREATE TABLE IF NOT EXISTS replay_history ( + id TEXT PRIMARY KEY, + original_job_id TEXT NOT NULL, + replay_job_id TEXT NOT NULL, + replayed_at {bi} NOT NULL, + original_result {bin}, + replay_result {bin}, + original_error TEXT, + replay_error TEXT + )" + ), + format!( + "CREATE TABLE IF NOT EXISTS task_logs ( + id TEXT PRIMARY KEY, + job_id TEXT NOT NULL, + task_name TEXT NOT NULL, + level TEXT NOT NULL DEFAULT 'info', + message TEXT NOT NULL, + extra TEXT, + logged_at {bi} NOT NULL + )" + ), + format!( + "CREATE TABLE IF NOT EXISTS circuit_breakers ( + task_name TEXT PRIMARY KEY, + state INTEGER NOT NULL DEFAULT 0, + failure_count INTEGER NOT NULL DEFAULT 0, + last_failure_at {bi}, + opened_at {bi}, + half_open_at {bi}, + threshold INTEGER NOT NULL DEFAULT 5, + window_ms {bi} NOT NULL DEFAULT 60000, + cooldown_ms {bi} NOT NULL DEFAULT 300000, + half_open_max_probes INTEGER NOT NULL DEFAULT 5, + half_open_success_rate {real} NOT NULL DEFAULT 0.8, + half_open_probe_count INTEGER NOT NULL DEFAULT 0, + half_open_success_count INTEGER NOT NULL DEFAULT 0, + half_open_failure_count INTEGER NOT NULL DEFAULT 0 + )" + ), + format!( + "CREATE TABLE IF NOT EXISTS workers ( + worker_id TEXT PRIMARY KEY, + last_heartbeat {bi} NOT NULL, + queues TEXT NOT NULL DEFAULT 'default', + status TEXT NOT NULL DEFAULT 'active' + )" + ), + format!( + "CREATE TABLE IF NOT EXISTS queue_state ( + queue_name TEXT PRIMARY KEY, + paused {bool_false}, + paused_at {bi} + )" + ), + format!( + "CREATE TABLE IF NOT EXISTS archived_jobs ( + id TEXT PRIMARY KEY, + queue TEXT NOT NULL DEFAULT 'default', + task_name TEXT NOT NULL, + payload {bin} NOT NULL, + status INTEGER NOT NULL DEFAULT 0, + priority INTEGER NOT NULL DEFAULT 0, + created_at {bi} NOT NULL, + scheduled_at {bi} NOT NULL, + started_at {bi}, + completed_at {bi}, + retry_count INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL DEFAULT 3, + result {bin}, + error TEXT, + timeout_ms {bi} NOT NULL DEFAULT 300000, + unique_key TEXT, + progress INTEGER, + metadata TEXT, + cancel_requested INTEGER NOT NULL DEFAULT 0, + expires_at {bi}, + result_ttl_ms {bi} + )" + ), + format!( + "CREATE TABLE IF NOT EXISTS distributed_locks ( + lock_name TEXT PRIMARY KEY, + owner_id TEXT NOT NULL, + acquired_at {bi} NOT NULL, + expires_at {bi} NOT NULL + )" + ), + format!( + "CREATE TABLE IF NOT EXISTS execution_claims ( + job_id TEXT PRIMARY KEY, + worker_id TEXT NOT NULL, + claimed_at {bi} NOT NULL + )" + ), + format!( + "CREATE TABLE IF NOT EXISTS dashboard_settings ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at {bi} NOT NULL + )" + ), + ] +} + +/// `CREATE INDEX IF NOT EXISTS` statements — identical across both backends. +pub fn create_indexes() -> &'static [&'static str] { + &[ + "CREATE INDEX IF NOT EXISTS idx_jobs_dequeue + ON jobs(queue, status, priority DESC, scheduled_at)", + "CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)", + "CREATE UNIQUE INDEX IF NOT EXISTS idx_jobs_unique_key + ON jobs(unique_key) WHERE unique_key IS NOT NULL AND status IN (0, 1)", + "CREATE INDEX IF NOT EXISTS idx_job_errors_job_id ON job_errors(job_id)", + "CREATE INDEX IF NOT EXISTS idx_job_deps_job_id ON job_dependencies(job_id)", + "CREATE INDEX IF NOT EXISTS idx_job_deps_depends_on ON job_dependencies(depends_on_job_id)", + "CREATE INDEX IF NOT EXISTS idx_task_metrics_task_name ON task_metrics(task_name)", + "CREATE INDEX IF NOT EXISTS idx_task_metrics_recorded_at ON task_metrics(recorded_at)", + "CREATE INDEX IF NOT EXISTS idx_replay_original ON replay_history(original_job_id)", + "CREATE INDEX IF NOT EXISTS idx_task_logs_job_id ON task_logs(job_id)", + "CREATE INDEX IF NOT EXISTS idx_task_logs_recorded ON task_logs(logged_at)", + "CREATE INDEX IF NOT EXISTS idx_archived_jobs_completed ON archived_jobs(completed_at)", + "CREATE INDEX IF NOT EXISTS idx_distributed_locks_expires ON distributed_locks(expires_at)", + "CREATE INDEX IF NOT EXISTS idx_execution_claims_claimed ON execution_claims(claimed_at)", + ] +} + +/// `ALTER TABLE … ADD COLUMN …` statements used to backfill schemas that +/// pre-date a column. Each backend supplies its own executor that handles +/// "column already exists" idempotently. +pub fn alter_statements(d: &Dialect) -> Vec { + let bi = d.big_int; + let real = d.real; + let ife = d.alter_if_not_exists; + + vec![ + // jobs ── cancel_requested / expires_at / result_ttl_ms + format!("ALTER TABLE jobs ADD COLUMN {ife}cancel_requested INTEGER NOT NULL DEFAULT 0"), + format!("ALTER TABLE jobs ADD COLUMN {ife}expires_at {bi}"), + format!("ALTER TABLE jobs ADD COLUMN {ife}result_ttl_ms {bi}"), + // dead_letter retry / timeout / ttl backfill + format!("ALTER TABLE dead_letter ADD COLUMN {ife}priority INTEGER NOT NULL DEFAULT 0"), + format!("ALTER TABLE dead_letter ADD COLUMN {ife}max_retries INTEGER NOT NULL DEFAULT 3"), + format!( + "ALTER TABLE dead_letter ADD COLUMN {ife}timeout_ms {bi} NOT NULL DEFAULT 300000" + ), + format!("ALTER TABLE dead_letter ADD COLUMN {ife}result_ttl_ms {bi}"), + // workers metadata backfill + format!("ALTER TABLE workers ADD COLUMN {ife}tags TEXT"), + format!("ALTER TABLE workers ADD COLUMN {ife}resources TEXT"), + format!("ALTER TABLE workers ADD COLUMN {ife}resource_health TEXT"), + format!("ALTER TABLE workers ADD COLUMN {ife}threads INTEGER NOT NULL DEFAULT 0"), + format!("ALTER TABLE workers ADD COLUMN {ife}started_at {bi}"), + format!("ALTER TABLE workers ADD COLUMN {ife}hostname TEXT"), + format!("ALTER TABLE workers ADD COLUMN {ife}pid INTEGER"), + format!("ALTER TABLE workers ADD COLUMN {ife}pool_type TEXT"), + // periodic_tasks timezone + format!("ALTER TABLE periodic_tasks ADD COLUMN {ife}timezone TEXT"), + // jobs namespace + format!("ALTER TABLE jobs ADD COLUMN {ife}namespace TEXT"), + // sample-based half-open circuit-breaker probe counters + format!( + "ALTER TABLE circuit_breakers ADD COLUMN {ife}half_open_max_probes INTEGER NOT NULL DEFAULT 5" + ), + format!( + "ALTER TABLE circuit_breakers ADD COLUMN {ife}half_open_success_rate {real} NOT NULL DEFAULT 0.8" + ), + format!( + "ALTER TABLE circuit_breakers ADD COLUMN {ife}half_open_probe_count INTEGER NOT NULL DEFAULT 0" + ), + format!( + "ALTER TABLE circuit_breakers ADD COLUMN {ife}half_open_success_count INTEGER NOT NULL DEFAULT 0" + ), + format!( + "ALTER TABLE circuit_breakers ADD COLUMN {ife}half_open_failure_count INTEGER NOT NULL DEFAULT 0" + ), + // namespace backfill on dead_letter / archived_jobs (after circuit-breaker columns) + format!("ALTER TABLE dead_letter ADD COLUMN {ife}namespace TEXT"), + format!("ALTER TABLE archived_jobs ADD COLUMN {ife}namespace TEXT"), + ] +} diff --git a/crates/taskito-core/src/storage/diesel_common/mod.rs b/crates/taskito-core/src/storage/diesel_common/mod.rs index 19fe4b5..70eae41 100644 --- a/crates/taskito-core/src/storage/diesel_common/mod.rs +++ b/crates/taskito-core/src/storage/diesel_common/mod.rs @@ -7,6 +7,7 @@ mod jobs; mod locks; mod logs; mod metrics; +pub(crate) mod migrations; pub(crate) use jobs::impl_diesel_job_ops; pub(crate) use locks::impl_diesel_lock_ops; diff --git a/crates/taskito-core/src/storage/postgres/mod.rs b/crates/taskito-core/src/storage/postgres/mod.rs index 14f6ee6..9440445 100644 --- a/crates/taskito-core/src/storage/postgres/mod.rs +++ b/crates/taskito-core/src/storage/postgres/mod.rs @@ -17,6 +17,7 @@ use diesel::prelude::*; use diesel::r2d2::{ConnectionManager, Pool}; use crate::error::Result; +use crate::storage::diesel_common::migrations as common_migrations; /// Run an ALTER TABLE migration, logging a warning on any failure. /// Postgres migrations use IF NOT EXISTS, so any error is genuinely unexpected. @@ -110,397 +111,19 @@ impl PostgresStorage { fn run_migrations(&self) -> Result<()> { let mut conn = self.conn()?; - // Ensure the schema exists before creating tables + // Postgres-only: ensure the target schema exists before any DDL runs. diesel::sql_query(format!("CREATE SCHEMA IF NOT EXISTS {}", self.schema)) .execute(&mut conn)?; - // Use PG-native types: TEXT, BYTEA, BIGINT, INTEGER, BOOLEAN, DOUBLE PRECISION - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS jobs ( - id TEXT PRIMARY KEY, - queue TEXT NOT NULL DEFAULT 'default', - task_name TEXT NOT NULL, - payload BYTEA NOT NULL, - status INTEGER NOT NULL DEFAULT 0, - priority INTEGER NOT NULL DEFAULT 0, - created_at BIGINT NOT NULL, - scheduled_at BIGINT NOT NULL, - started_at BIGINT, - completed_at BIGINT, - retry_count INTEGER NOT NULL DEFAULT 0, - max_retries INTEGER NOT NULL DEFAULT 3, - result BYTEA, - error TEXT, - timeout_ms BIGINT NOT NULL DEFAULT 300000, - unique_key TEXT, - progress INTEGER, - metadata TEXT, - cancel_requested INTEGER NOT NULL DEFAULT 0, - expires_at BIGINT, - result_ttl_ms BIGINT - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_jobs_dequeue - ON jobs(queue, status, priority DESC, scheduled_at)", - ) - .execute(&mut conn)?; - - diesel::sql_query("CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)") - .execute(&mut conn)?; - - // PG partial unique index - diesel::sql_query( - "CREATE UNIQUE INDEX IF NOT EXISTS idx_jobs_unique_key - ON jobs(unique_key) WHERE unique_key IS NOT NULL AND status IN (0, 1)", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS dead_letter ( - id TEXT PRIMARY KEY, - original_job_id TEXT NOT NULL, - queue TEXT NOT NULL, - task_name TEXT NOT NULL, - payload BYTEA NOT NULL, - error TEXT, - retry_count INTEGER NOT NULL, - failed_at BIGINT NOT NULL, - metadata TEXT, - priority INTEGER NOT NULL DEFAULT 0, - max_retries INTEGER NOT NULL DEFAULT 3, - timeout_ms BIGINT NOT NULL DEFAULT 300000, - result_ttl_ms BIGINT - )", - ) - .execute(&mut conn)?; - - // Migration: add columns if they don't exist (for existing databases) - for col in &[ - "ALTER TABLE dead_letter ADD COLUMN IF NOT EXISTS priority INTEGER NOT NULL DEFAULT 0", - "ALTER TABLE dead_letter ADD COLUMN IF NOT EXISTS max_retries INTEGER NOT NULL DEFAULT 3", - "ALTER TABLE dead_letter ADD COLUMN IF NOT EXISTS timeout_ms BIGINT NOT NULL DEFAULT 300000", - "ALTER TABLE dead_letter ADD COLUMN IF NOT EXISTS result_ttl_ms BIGINT", - ] { - migration_alter(&mut conn, col); + for sql in common_migrations::create_tables(&common_migrations::POSTGRES) { + diesel::sql_query(&sql).execute(&mut conn)?; + } + for sql in common_migrations::create_indexes() { + diesel::sql_query(*sql).execute(&mut conn)?; + } + for sql in common_migrations::alter_statements(&common_migrations::POSTGRES) { + migration_alter(&mut conn, &sql); } - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS rate_limits ( - key TEXT PRIMARY KEY, - tokens DOUBLE PRECISION NOT NULL, - max_tokens DOUBLE PRECISION NOT NULL, - refill_rate DOUBLE PRECISION NOT NULL, - last_refill BIGINT NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS periodic_tasks ( - name TEXT PRIMARY KEY, - task_name TEXT NOT NULL, - cron_expr TEXT NOT NULL, - args BYTEA, - kwargs BYTEA, - queue TEXT NOT NULL DEFAULT 'default', - enabled BOOLEAN NOT NULL DEFAULT TRUE, - last_run BIGINT, - next_run BIGINT NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS job_errors ( - id TEXT PRIMARY KEY, - job_id TEXT NOT NULL, - attempt INTEGER NOT NULL, - error TEXT NOT NULL, - failed_at BIGINT NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query("CREATE INDEX IF NOT EXISTS idx_job_errors_job_id ON job_errors(job_id)") - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS job_dependencies ( - id TEXT PRIMARY KEY, - job_id TEXT NOT NULL, - depends_on_job_id TEXT NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_job_deps_job_id ON job_dependencies(job_id)", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_job_deps_depends_on ON job_dependencies(depends_on_job_id)" - ).execute(&mut conn)?; - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS task_metrics ( - id TEXT PRIMARY KEY, - task_name TEXT NOT NULL, - job_id TEXT NOT NULL, - wall_time_ns BIGINT NOT NULL, - memory_bytes BIGINT NOT NULL DEFAULT 0, - succeeded BOOLEAN NOT NULL DEFAULT TRUE, - recorded_at BIGINT NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_task_metrics_task_name ON task_metrics(task_name)", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_task_metrics_recorded_at ON task_metrics(recorded_at)", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS replay_history ( - id TEXT PRIMARY KEY, - original_job_id TEXT NOT NULL, - replay_job_id TEXT NOT NULL, - replayed_at BIGINT NOT NULL, - original_result BYTEA, - replay_result BYTEA, - original_error TEXT, - replay_error TEXT - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_replay_original ON replay_history(original_job_id)", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS task_logs ( - id TEXT PRIMARY KEY, - job_id TEXT NOT NULL, - task_name TEXT NOT NULL, - level TEXT NOT NULL DEFAULT 'info', - message TEXT NOT NULL, - extra TEXT, - logged_at BIGINT NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query("CREATE INDEX IF NOT EXISTS idx_task_logs_job_id ON task_logs(job_id)") - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_task_logs_recorded ON task_logs(logged_at)", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS circuit_breakers ( - task_name TEXT PRIMARY KEY, - state INTEGER NOT NULL DEFAULT 0, - failure_count INTEGER NOT NULL DEFAULT 0, - last_failure_at BIGINT, - opened_at BIGINT, - half_open_at BIGINT, - threshold INTEGER NOT NULL DEFAULT 5, - window_ms BIGINT NOT NULL DEFAULT 60000, - cooldown_ms BIGINT NOT NULL DEFAULT 300000, - half_open_max_probes INTEGER NOT NULL DEFAULT 5, - half_open_success_rate DOUBLE PRECISION NOT NULL DEFAULT 0.8, - half_open_probe_count INTEGER NOT NULL DEFAULT 0, - half_open_success_count INTEGER NOT NULL DEFAULT 0, - half_open_failure_count INTEGER NOT NULL DEFAULT 0 - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS workers ( - worker_id TEXT PRIMARY KEY, - last_heartbeat BIGINT NOT NULL, - queues TEXT NOT NULL DEFAULT 'default', - status TEXT NOT NULL DEFAULT 'active' - )", - ) - .execute(&mut conn)?; - - // Migration: add tags column to workers - migration_alter( - &mut conn, - "ALTER TABLE workers ADD COLUMN IF NOT EXISTS tags TEXT", - ); - - // Migration: add resource advertisement columns to workers - migration_alter( - &mut conn, - "ALTER TABLE workers ADD COLUMN IF NOT EXISTS resources TEXT", - ); - migration_alter( - &mut conn, - "ALTER TABLE workers ADD COLUMN IF NOT EXISTS resource_health TEXT", - ); - migration_alter( - &mut conn, - "ALTER TABLE workers ADD COLUMN IF NOT EXISTS threads INTEGER NOT NULL DEFAULT 0", - ); - - // Migration: add worker discovery metadata columns - migration_alter( - &mut conn, - "ALTER TABLE workers ADD COLUMN IF NOT EXISTS started_at BIGINT", - ); - migration_alter( - &mut conn, - "ALTER TABLE workers ADD COLUMN IF NOT EXISTS hostname TEXT", - ); - migration_alter( - &mut conn, - "ALTER TABLE workers ADD COLUMN IF NOT EXISTS pid INTEGER", - ); - migration_alter( - &mut conn, - "ALTER TABLE workers ADD COLUMN IF NOT EXISTS pool_type TEXT", - ); - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS queue_state ( - queue_name TEXT PRIMARY KEY, - paused BOOLEAN NOT NULL DEFAULT FALSE, - paused_at BIGINT - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS archived_jobs ( - id TEXT PRIMARY KEY, - queue TEXT NOT NULL DEFAULT 'default', - task_name TEXT NOT NULL, - payload BYTEA NOT NULL, - status INTEGER NOT NULL DEFAULT 0, - priority INTEGER NOT NULL DEFAULT 0, - created_at BIGINT NOT NULL, - scheduled_at BIGINT NOT NULL, - started_at BIGINT, - completed_at BIGINT, - retry_count INTEGER NOT NULL DEFAULT 0, - max_retries INTEGER NOT NULL DEFAULT 3, - result BYTEA, - error TEXT, - timeout_ms BIGINT NOT NULL DEFAULT 300000, - unique_key TEXT, - progress INTEGER, - metadata TEXT, - cancel_requested INTEGER NOT NULL DEFAULT 0, - expires_at BIGINT, - result_ttl_ms BIGINT - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_archived_jobs_completed ON archived_jobs(completed_at)", - ) - .execute(&mut conn)?; - - // Periodic tasks timezone migration - migration_alter( - &mut conn, - "ALTER TABLE periodic_tasks ADD COLUMN IF NOT EXISTS timezone TEXT", - ); - - // Migration: add namespace column to jobs - migration_alter( - &mut conn, - "ALTER TABLE jobs ADD COLUMN IF NOT EXISTS namespace TEXT", - ); - - // Migration: add sample-based half-open probes to circuit breakers - migration_alter( - &mut conn, - "ALTER TABLE circuit_breakers ADD COLUMN IF NOT EXISTS half_open_max_probes INTEGER NOT NULL DEFAULT 5", - ); - migration_alter( - &mut conn, - "ALTER TABLE circuit_breakers ADD COLUMN IF NOT EXISTS half_open_success_rate DOUBLE PRECISION NOT NULL DEFAULT 0.8", - ); - migration_alter( - &mut conn, - "ALTER TABLE circuit_breakers ADD COLUMN IF NOT EXISTS half_open_probe_count INTEGER NOT NULL DEFAULT 0", - ); - migration_alter( - &mut conn, - "ALTER TABLE circuit_breakers ADD COLUMN IF NOT EXISTS half_open_success_count INTEGER NOT NULL DEFAULT 0", - ); - migration_alter( - &mut conn, - "ALTER TABLE circuit_breakers ADD COLUMN IF NOT EXISTS half_open_failure_count INTEGER NOT NULL DEFAULT 0", - ); - - // Migration: add namespace column to dead_letter and archived_jobs - migration_alter( - &mut conn, - "ALTER TABLE dead_letter ADD COLUMN IF NOT EXISTS namespace TEXT", - ); - migration_alter( - &mut conn, - "ALTER TABLE archived_jobs ADD COLUMN IF NOT EXISTS namespace TEXT", - ); - - // ── Distributed Locks ───────────────────────────── - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS distributed_locks ( - lock_name TEXT PRIMARY KEY, - owner_id TEXT NOT NULL, - acquired_at BIGINT NOT NULL, - expires_at BIGINT NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_distributed_locks_expires ON distributed_locks(expires_at)", - ) - .execute(&mut conn)?; - - // ── Execution Claims ────────────────────────────── - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS execution_claims ( - job_id TEXT PRIMARY KEY, - worker_id TEXT NOT NULL, - claimed_at BIGINT NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_execution_claims_claimed ON execution_claims(claimed_at)", - ) - .execute(&mut conn)?; - - // ── Dashboard Settings ─────────────────────────── - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS dashboard_settings ( - key TEXT PRIMARY KEY, - value TEXT NOT NULL, - updated_at BIGINT NOT NULL - )", - ) - .execute(&mut conn)?; Ok(()) } diff --git a/crates/taskito-core/src/storage/sqlite/mod.rs b/crates/taskito-core/src/storage/sqlite/mod.rs index a0bbe8f..8579e6a 100644 --- a/crates/taskito-core/src/storage/sqlite/mod.rs +++ b/crates/taskito-core/src/storage/sqlite/mod.rs @@ -17,6 +17,7 @@ use diesel::r2d2::{ConnectionManager, CustomizeConnection, Pool}; use diesel::sqlite::SqliteConnection; use crate::error::Result; +use crate::storage::diesel_common::migrations as common_migrations; /// Run an ALTER TABLE migration, suppressing "duplicate column" errors (SQLite). fn migration_alter(conn: &mut SqliteConnection, sql: &str) { @@ -105,391 +106,15 @@ impl SqliteStorage { fn run_migrations(&self) -> Result<()> { let mut conn = self.conn()?; - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS jobs ( - id TEXT PRIMARY KEY, - queue TEXT NOT NULL DEFAULT 'default', - task_name TEXT NOT NULL, - payload BLOB NOT NULL, - status INTEGER NOT NULL DEFAULT 0, - priority INTEGER NOT NULL DEFAULT 0, - created_at INTEGER NOT NULL, - scheduled_at INTEGER NOT NULL, - started_at INTEGER, - completed_at INTEGER, - retry_count INTEGER NOT NULL DEFAULT 0, - max_retries INTEGER NOT NULL DEFAULT 3, - result BLOB, - error TEXT, - timeout_ms INTEGER NOT NULL DEFAULT 300000, - unique_key TEXT, - progress INTEGER, - metadata TEXT, - cancel_requested INTEGER NOT NULL DEFAULT 0, - expires_at INTEGER, - result_ttl_ms INTEGER - )", - ) - .execute(&mut conn)?; - - // Add new columns if they don't exist (migration for existing DBs) - migration_alter( - &mut conn, - "ALTER TABLE jobs ADD COLUMN cancel_requested INTEGER NOT NULL DEFAULT 0", - ); - migration_alter(&mut conn, "ALTER TABLE jobs ADD COLUMN expires_at INTEGER"); - migration_alter( - &mut conn, - "ALTER TABLE jobs ADD COLUMN result_ttl_ms INTEGER", - ); - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_jobs_dequeue - ON jobs(queue, status, priority DESC, scheduled_at)", - ) - .execute(&mut conn)?; - - diesel::sql_query("CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)") - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE UNIQUE INDEX IF NOT EXISTS idx_jobs_unique_key - ON jobs(unique_key) WHERE unique_key IS NOT NULL AND status IN (0, 1)", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS dead_letter ( - id TEXT PRIMARY KEY, - original_job_id TEXT NOT NULL, - queue TEXT NOT NULL, - task_name TEXT NOT NULL, - payload BLOB NOT NULL, - error TEXT, - retry_count INTEGER NOT NULL, - failed_at INTEGER NOT NULL, - metadata TEXT, - priority INTEGER NOT NULL DEFAULT 0, - max_retries INTEGER NOT NULL DEFAULT 3, - timeout_ms INTEGER NOT NULL DEFAULT 300000, - result_ttl_ms INTEGER - )", - ) - .execute(&mut conn)?; - - // Migration: add columns if they don't exist (for existing databases) - for col in &[ - "ALTER TABLE dead_letter ADD COLUMN priority INTEGER NOT NULL DEFAULT 0", - "ALTER TABLE dead_letter ADD COLUMN max_retries INTEGER NOT NULL DEFAULT 3", - "ALTER TABLE dead_letter ADD COLUMN timeout_ms INTEGER NOT NULL DEFAULT 300000", - "ALTER TABLE dead_letter ADD COLUMN result_ttl_ms INTEGER", - ] { - migration_alter(&mut conn, col); + for sql in common_migrations::create_tables(&common_migrations::SQLITE) { + diesel::sql_query(&sql).execute(&mut conn)?; + } + for sql in common_migrations::create_indexes() { + diesel::sql_query(*sql).execute(&mut conn)?; + } + for sql in common_migrations::alter_statements(&common_migrations::SQLITE) { + migration_alter(&mut conn, &sql); } - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS rate_limits ( - key TEXT PRIMARY KEY, - tokens REAL NOT NULL, - max_tokens REAL NOT NULL, - refill_rate REAL NOT NULL, - last_refill INTEGER NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS periodic_tasks ( - name TEXT PRIMARY KEY, - task_name TEXT NOT NULL, - cron_expr TEXT NOT NULL, - args BLOB, - kwargs BLOB, - queue TEXT NOT NULL DEFAULT 'default', - enabled INTEGER NOT NULL DEFAULT 1, - last_run INTEGER, - next_run INTEGER NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS job_errors ( - id TEXT PRIMARY KEY, - job_id TEXT NOT NULL, - attempt INTEGER NOT NULL, - error TEXT NOT NULL, - failed_at INTEGER NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query("CREATE INDEX IF NOT EXISTS idx_job_errors_job_id ON job_errors(job_id)") - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS job_dependencies ( - id TEXT PRIMARY KEY, - job_id TEXT NOT NULL, - depends_on_job_id TEXT NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_job_deps_job_id ON job_dependencies(job_id)", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_job_deps_depends_on ON job_dependencies(depends_on_job_id)" - ).execute(&mut conn)?; - - // ── Task Metrics ────────────────────────────────── - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS task_metrics ( - id TEXT PRIMARY KEY, - task_name TEXT NOT NULL, - job_id TEXT NOT NULL, - wall_time_ns INTEGER NOT NULL, - memory_bytes INTEGER NOT NULL DEFAULT 0, - succeeded INTEGER NOT NULL DEFAULT 1, - recorded_at INTEGER NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_task_metrics_task_name ON task_metrics(task_name)", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_task_metrics_recorded_at ON task_metrics(recorded_at)", - ) - .execute(&mut conn)?; - - // ── Replay History ──────────────────────────────── - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS replay_history ( - id TEXT PRIMARY KEY, - original_job_id TEXT NOT NULL, - replay_job_id TEXT NOT NULL, - replayed_at INTEGER NOT NULL, - original_result BLOB, - replay_result BLOB, - original_error TEXT, - replay_error TEXT - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_replay_original ON replay_history(original_job_id)", - ) - .execute(&mut conn)?; - - // ── Task Logs ───────────────────────────────────── - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS task_logs ( - id TEXT PRIMARY KEY, - job_id TEXT NOT NULL, - task_name TEXT NOT NULL, - level TEXT NOT NULL DEFAULT 'info', - message TEXT NOT NULL, - extra TEXT, - logged_at INTEGER NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query("CREATE INDEX IF NOT EXISTS idx_task_logs_job_id ON task_logs(job_id)") - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_task_logs_recorded ON task_logs(logged_at)", - ) - .execute(&mut conn)?; - - // ── Circuit Breakers ────────────────────────────── - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS circuit_breakers ( - task_name TEXT PRIMARY KEY, - state INTEGER NOT NULL DEFAULT 0, - failure_count INTEGER NOT NULL DEFAULT 0, - last_failure_at INTEGER, - opened_at INTEGER, - half_open_at INTEGER, - threshold INTEGER NOT NULL DEFAULT 5, - window_ms INTEGER NOT NULL DEFAULT 60000, - cooldown_ms INTEGER NOT NULL DEFAULT 300000, - half_open_max_probes INTEGER NOT NULL DEFAULT 5, - half_open_success_rate REAL NOT NULL DEFAULT 0.8, - half_open_probe_count INTEGER NOT NULL DEFAULT 0, - half_open_success_count INTEGER NOT NULL DEFAULT 0, - half_open_failure_count INTEGER NOT NULL DEFAULT 0 - )", - ) - .execute(&mut conn)?; - - // ── Workers ─────────────────────────────────────── - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS workers ( - worker_id TEXT PRIMARY KEY, - last_heartbeat INTEGER NOT NULL, - queues TEXT NOT NULL DEFAULT 'default', - status TEXT NOT NULL DEFAULT 'active' - )", - ) - .execute(&mut conn)?; - - // Migration: add tags column to workers - migration_alter(&mut conn, "ALTER TABLE workers ADD COLUMN tags TEXT"); - - // Migration: add resource advertisement columns to workers - migration_alter(&mut conn, "ALTER TABLE workers ADD COLUMN resources TEXT"); - migration_alter( - &mut conn, - "ALTER TABLE workers ADD COLUMN resource_health TEXT", - ); - migration_alter( - &mut conn, - "ALTER TABLE workers ADD COLUMN threads INTEGER NOT NULL DEFAULT 0", - ); - - // Migration: add worker discovery metadata columns - migration_alter( - &mut conn, - "ALTER TABLE workers ADD COLUMN started_at INTEGER", - ); - migration_alter(&mut conn, "ALTER TABLE workers ADD COLUMN hostname TEXT"); - migration_alter(&mut conn, "ALTER TABLE workers ADD COLUMN pid INTEGER"); - migration_alter(&mut conn, "ALTER TABLE workers ADD COLUMN pool_type TEXT"); - - // ── Queue State ────────────────────────────────── - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS queue_state ( - queue_name TEXT PRIMARY KEY, - paused INTEGER NOT NULL DEFAULT 0, - paused_at INTEGER - )", - ) - .execute(&mut conn)?; - - // ── Archived Jobs ──────────────────────────────── - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS archived_jobs ( - id TEXT PRIMARY KEY, - queue TEXT NOT NULL DEFAULT 'default', - task_name TEXT NOT NULL, - payload BLOB NOT NULL, - status INTEGER NOT NULL DEFAULT 0, - priority INTEGER NOT NULL DEFAULT 0, - created_at INTEGER NOT NULL, - scheduled_at INTEGER NOT NULL, - started_at INTEGER, - completed_at INTEGER, - retry_count INTEGER NOT NULL DEFAULT 0, - max_retries INTEGER NOT NULL DEFAULT 3, - result BLOB, - error TEXT, - timeout_ms INTEGER NOT NULL DEFAULT 300000, - unique_key TEXT, - progress INTEGER, - metadata TEXT, - cancel_requested INTEGER NOT NULL DEFAULT 0, - expires_at INTEGER, - result_ttl_ms INTEGER - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_archived_jobs_completed ON archived_jobs(completed_at)", - ) - .execute(&mut conn)?; - - // ── Periodic tasks timezone migration ──────────── - migration_alter( - &mut conn, - "ALTER TABLE periodic_tasks ADD COLUMN timezone TEXT", - ); - - // Migration: add namespace column to jobs - migration_alter(&mut conn, "ALTER TABLE jobs ADD COLUMN namespace TEXT"); - - // Migration: add sample-based half-open probes to circuit breakers - migration_alter( - &mut conn, - "ALTER TABLE circuit_breakers ADD COLUMN half_open_max_probes INTEGER NOT NULL DEFAULT 5", - ); - migration_alter( - &mut conn, - "ALTER TABLE circuit_breakers ADD COLUMN half_open_success_rate REAL NOT NULL DEFAULT 0.8", - ); - migration_alter( - &mut conn, - "ALTER TABLE circuit_breakers ADD COLUMN half_open_probe_count INTEGER NOT NULL DEFAULT 0", - ); - migration_alter( - &mut conn, - "ALTER TABLE circuit_breakers ADD COLUMN half_open_success_count INTEGER NOT NULL DEFAULT 0", - ); - migration_alter( - &mut conn, - "ALTER TABLE circuit_breakers ADD COLUMN half_open_failure_count INTEGER NOT NULL DEFAULT 0", - ); - - // Migration: add namespace column to dead_letter and archived_jobs - migration_alter( - &mut conn, - "ALTER TABLE dead_letter ADD COLUMN namespace TEXT", - ); - migration_alter( - &mut conn, - "ALTER TABLE archived_jobs ADD COLUMN namespace TEXT", - ); - - // ── Distributed Locks ───────────────────────────── - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS distributed_locks ( - lock_name TEXT PRIMARY KEY, - owner_id TEXT NOT NULL, - acquired_at INTEGER NOT NULL, - expires_at INTEGER NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_distributed_locks_expires ON distributed_locks(expires_at)", - ) - .execute(&mut conn)?; - - // ── Execution Claims ────────────────────────────── - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS execution_claims ( - job_id TEXT PRIMARY KEY, - worker_id TEXT NOT NULL, - claimed_at INTEGER NOT NULL - )", - ) - .execute(&mut conn)?; - - diesel::sql_query( - "CREATE INDEX IF NOT EXISTS idx_execution_claims_claimed ON execution_claims(claimed_at)", - ) - .execute(&mut conn)?; - - // ── Dashboard Settings ─────────────────────────── - diesel::sql_query( - "CREATE TABLE IF NOT EXISTS dashboard_settings ( - key TEXT PRIMARY KEY, - value TEXT NOT NULL, - updated_at INTEGER NOT NULL - )", - ) - .execute(&mut conn)?; Ok(()) } From db85a3377dd7b807d01646828a69528c593a619b Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Thu, 14 May 2026 02:19:04 +0530 Subject: [PATCH 6/6] chore(release): bump version to 0.12.2 Bumps the four workspace crates (taskito-core, taskito-python, taskito-workflows, taskito-async), the pyproject.toml project version, and the importlib.metadata fallback in py_src/taskito/__init__.py. Adds a 0.12.2 changelog entry summarising the audit-driven refactors landing in this release: predicate inline-import fix, QueuePredicateMixin and QueueRuntimeConfigMixin extractions, shared migration DDL via diesel_common/migrations.rs, and the new tests/core/test_predicates.py unit + integration suite. --- crates/taskito-async/Cargo.toml | 2 +- crates/taskito-core/Cargo.toml | 2 +- crates/taskito-python/Cargo.toml | 2 +- crates/taskito-workflows/Cargo.toml | 2 +- docs/content/docs/more/changelog.mdx | 21 +++++++++++++++++++++ py_src/taskito/__init__.py | 2 +- pyproject.toml | 2 +- 7 files changed, 27 insertions(+), 6 deletions(-) diff --git a/crates/taskito-async/Cargo.toml b/crates/taskito-async/Cargo.toml index da71bd9..cc76b24 100644 --- a/crates/taskito-async/Cargo.toml +++ b/crates/taskito-async/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "taskito-async" -version = "0.12.1" +version = "0.12.2" edition = "2021" [dependencies] diff --git a/crates/taskito-core/Cargo.toml b/crates/taskito-core/Cargo.toml index b763243..c317be9 100644 --- a/crates/taskito-core/Cargo.toml +++ b/crates/taskito-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "taskito-core" -version = "0.12.1" +version = "0.12.2" edition = "2021" [features] diff --git a/crates/taskito-python/Cargo.toml b/crates/taskito-python/Cargo.toml index ca9c4cb..1101e5e 100644 --- a/crates/taskito-python/Cargo.toml +++ b/crates/taskito-python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "taskito-python" -version = "0.12.1" +version = "0.12.2" edition = "2021" [features] diff --git a/crates/taskito-workflows/Cargo.toml b/crates/taskito-workflows/Cargo.toml index 9a57557..21ed107 100644 --- a/crates/taskito-workflows/Cargo.toml +++ b/crates/taskito-workflows/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "taskito-workflows" -version = "0.12.1" +version = "0.12.2" edition = "2021" [dependencies] diff --git a/docs/content/docs/more/changelog.mdx b/docs/content/docs/more/changelog.mdx index a8a9ccc..efd8396 100644 --- a/docs/content/docs/more/changelog.mdx +++ b/docs/content/docs/more/changelog.mdx @@ -5,6 +5,27 @@ description: "Release history for taskito — every notable change, fix, and fea All notable changes to taskito are documented here. +## 0.12.2 + +### Fixed + +- **Inline imports inside the predicate boolean combinators.** `AndPredicate.evaluate`, `OrPredicate.evaluate`, and `NotPredicate.evaluate` were each importing `_resolve_outcome` from `taskito.predicates.evaluate` *inside* the method body. The cycle they were defending against doesn't exist at runtime — `evaluate.py` only references `core` under `TYPE_CHECKING` — so the import was hoisted to module scope, satisfying the project's no-inline-imports rule. + +### Internal + +- **`QueuePredicateMixin` extracted from `app.py`.** Predicate state (six instance dicts and `PredicateMetrics`) plus the three gating methods (`_apply_enqueue_predicate`, `_apply_dispatch_predicate`, `_reenqueue_after_defer`) and the public inspection / registration API (`list_predicates`, `predicate_for`, `register_predicate`) now live on a dedicated mixin under `py_src/taskito/mixins/predicates.py`. Repeated `_emit_event(...)` blocks consolidated into small helpers. `app.py` drops from 901 to 619 LOC. +- **`QueueRuntimeConfigMixin` split from `mixins/decorators.py`.** `register_type`, `set_queue_rate_limit`, and `set_queue_concurrency` are runtime configuration knobs, not decorator surface — they moved to a new mixin alongside the existing `QueueSettingsMixin` (which manages dashboard key/value state). `mixins/decorators.py` drops from 597 to 533 LOC. +- **Migration DDL deduplicated via `diesel_common/migrations.rs`.** The `run_migrations()` methods on `SqliteStorage` and `PostgresStorage` shared ~750 LOC of nearly-identical `CREATE TABLE` / `CREATE INDEX` / `ALTER TABLE` statements. The shared module now exposes `create_tables(&dialect)`, `create_indexes()`, and `alter_statements(&dialect)`; a `Dialect` struct holds per-backend type substitutions (`BLOB`/`BYTEA`, `INTEGER`/`BIGINT`, `REAL`/`DOUBLE PRECISION`, boolean defaults, and the `IF NOT EXISTS` prefix on `ALTER`). Each backend's `run_migrations()` is now a ~10-line driver. `sqlite/mod.rs` drops 502 → 126 LOC; `postgres/mod.rs` drops 508 → 130 LOC. +- **`tests/core/test_predicates.py`** — 30 focused tests covering AST short-circuit semantics, fail-closed evaluation + metric recording, JSON and string-DSL round-trips, recipe behaviour (`after`/`before`/`in_time_window`/`payload_matches`/`env_var_truthy`), the callable adapter, custom predicate registration, and three queue integration tests for enqueue-time cancel/defer. + +### Test counts at release + +- Rust: 95 tests (default), 107 with `--features workflows` +- Python: 592 passed, 9 skipped across 58 files +- Dashboard (vitest): 121 tests across 9 files + +--- + ## 0.12.1 ### Fixed diff --git a/py_src/taskito/__init__.py b/py_src/taskito/__init__.py index a1056f7..0ced0ac 100644 --- a/py_src/taskito/__init__.py +++ b/py_src/taskito/__init__.py @@ -103,4 +103,4 @@ __version__ = _get_version("taskito") except PackageNotFoundError: - __version__ = "0.12.1" + __version__ = "0.12.2" diff --git a/pyproject.toml b/pyproject.toml index 78e19ee..9fdfddd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "taskito" -version = "0.12.1" +version = "0.12.2" description = "Rust-powered task queue for Python. No broker required." requires-python = ">=3.10" license = { file = "LICENSE" }