diff --git a/docs/content/docs/guides/reliability/idempotency.mdx b/docs/content/docs/guides/reliability/idempotency.mdx new file mode 100644 index 0000000..e85aa29 --- /dev/null +++ b/docs/content/docs/guides/reliability/idempotency.mdx @@ -0,0 +1,85 @@ +--- +title: Idempotency +description: "Auto-derived deduplication keys — declare a task idempotent and identical calls coalesce to a single execution." +--- + +When a task represents an action that **must not run twice** (charging a card, +sending a webhook, applying a credit), set `idempotent=True` on the task. +Taskito derives a deduplication key from the task name and serialized arguments. +Two enqueues with identical arguments while the first job is pending or running +return the **same job ID** — the second call becomes a no-op. + +## Declaring an idempotent task + +```python +@queue.task(idempotent=True) +def charge_customer(customer_id: int, amount_cents: int) -> str: + return payment_provider.charge(customer_id, amount_cents) + +job1 = charge_customer.delay(42, 1000) +job2 = charge_customer.delay(42, 1000) + +assert job1.id == job2.id # second call coalesced into the first +``` + +## How the key is derived + +Auto-keys follow the format `auto:`. Hashing +happens *after* serialization, so two calls with logically equal but +type-different arguments collide only when the serializer emits identical bytes +— the same contract that the wire payload uses. + +The dedup slot is held while the job is `pending` or `running`. Once the job +reaches a terminal state (`completed`, `failed`, `dead`), the slot is freed and +a fresh call with the same arguments creates a new job. + +## Per-call overrides + +```python +# Custom semantic key (overrides the auto-derived hash) +charge_customer.apply_async( + args=(42, 1000), + idempotency_key="invoice-2026-05-08-42", +) + +# Force a fresh execution even on an idempotent task +charge_customer.apply_async(args=(42, 1000), idempotent=False) +``` + +| Parameter | Type | Effect | +|-----------|------|--------| +| `idempotent=True` (decorator) | `bool` | Default for every `delay()` / `apply_async()` | +| `idempotency_key="…"` (call) | `str` | Explicit key. Overrides auto-derivation. | +| `idempotent=False` (call) | `bool` | Disable auto-derivation for this submission | +| `unique_key="…"` (call) | `str` | Backwards-compat alias of `idempotency_key` | + +Precedence (high to low): `unique_key` → `idempotency_key` → auto-derivation. + +## Pitfalls + +- **Don't pass moving values as arguments.** `datetime.now()`, `uuid4()`, and + similar will produce a different hash on every call, so dedup never triggers. + Move them inside the task body or pass a stable identifier instead. +- **Argument order matters** — `charge(42, 1000)` and `charge(amount=1000, customer_id=42)` + serialize differently and therefore hash differently. Be consistent at the + call site or use an explicit `idempotency_key`. +- **Serializer matters.** If you change the per-task serializer between + releases, hashes change and in-flight jobs from the old release won't dedupe + against new submissions. Use explicit `idempotency_key` for keys that must + survive deploys. +- **Test mode skips dedup.** `queue.test_mode()` runs tasks synchronously + without storage, so idempotency is a no-op there. + +## Bulk submissions + +`enqueue_many()` accepts a per-job `idempotency_keys` list, or a uniform +`idempotent=True` that auto-derives a key per row: + +```python +queue.enqueue_many( + task_name="myapp.charge_customer", + args_list=[(42, 1000), (43, 2000), (42, 1000)], + idempotent=True, +) +# Third entry collides with the first → only two distinct jobs created. +``` diff --git a/docs/content/docs/guides/reliability/meta.json b/docs/content/docs/guides/reliability/meta.json index 42eacad..132c3c9 100644 --- a/docs/content/docs/guides/reliability/meta.json +++ b/docs/content/docs/guides/reliability/meta.json @@ -4,6 +4,7 @@ "retries", "error-handling", "guarantees", + "idempotency", "rate-limiting", "circuit-breakers", "locking" diff --git a/py_src/taskito/app.py b/py_src/taskito/app.py index 2db4d9a..6f84b10 100644 --- a/py_src/taskito/app.py +++ b/py_src/taskito/app.py @@ -16,6 +16,7 @@ from __future__ import annotations import functools +import hashlib import logging import os from collections.abc import Callable @@ -210,6 +211,7 @@ def __init__( } self._serializer: Serializer = serializer or CloudpickleSerializer() self._task_serializers: dict[str, Serializer] = {} + self._task_idempotent: dict[str, bool] = {} self._global_middleware: list[TaskMiddleware] = middleware or [] self._task_middleware: dict[str, list[TaskMiddleware]] = {} self._task_retry_filters: dict[str, dict[str, list[type[Exception]]]] = {} @@ -265,6 +267,37 @@ def _get_serializer(self, task_name: str) -> Serializer: """Get the serializer for a task (per-task or queue-level fallback).""" return self._task_serializers.get(task_name, self._serializer) + @staticmethod + def _auto_idempotency_key(task_name: str, payload: bytes) -> str: + """Derive a deterministic dedup key from task name + serialized payload.""" + digest = hashlib.sha256(task_name.encode("utf-8") + b"|" + payload).hexdigest() + return f"auto:{digest[:32]}" + + def _resolve_unique_key( + self, + task_name: str, + payload: bytes, + unique_key: str | None, + idempotency_key: str | None, + idempotent: bool | None, + ) -> str | None: + """Resolve which dedup key (if any) to send to storage. + + Precedence: explicit unique_key > explicit idempotency_key > auto-key + when the per-call ``idempotent`` flag (or per-task default) is True. + A per-call ``idempotent=False`` disables auto-derivation even if the + task is registered as idempotent. + """ + if unique_key is not None: + return unique_key + if idempotency_key is not None: + return idempotency_key + if idempotent is False: + return None + if idempotent or self._task_idempotent.get(task_name, False): + return self._auto_idempotency_key(task_name, payload) + return None + def _deserialize_payload(self, task_name: str, payload: bytes) -> tuple: """Deserialize a job payload using the per-task or queue-level serializer.""" return self._get_serializer(task_name).loads(payload) # type: ignore[no-any-return] @@ -397,6 +430,8 @@ def enqueue( depends_on: str | list[str] | None = None, expires: float | None = None, result_ttl: int | None = None, + idempotency_key: str | None = None, + idempotent: bool | None = None, ) -> JobResult: """Enqueue a task for background execution. @@ -409,11 +444,19 @@ def enqueue( queue: Target queue name. max_retries: Max retry attempts. timeout: Timeout in seconds. - unique_key: Deduplication key. + unique_key: Deduplication key (alias of ``idempotency_key`` — wins + if both are set, kept for backwards compatibility). metadata: Arbitrary JSON string to attach to the job. depends_on: Job ID or list of job IDs that must complete first. expires: Seconds until the job expires (skipped if not started by then). result_ttl: Per-job result TTL in seconds. Overrides global result_ttl. + idempotency_key: Explicit dedup key. A second enqueue with the same + key while the first job is pending or running returns the + existing job's ID instead of creating a duplicate. + idempotent: Force-on (``True``) or force-off (``False``) the + auto-derived dedup key for this single call. ``None`` (the + default) falls back to the per-task setting from + ``@queue.task(idempotent=True)``. """ final_args = args final_kwargs = kwargs or {} @@ -430,6 +473,8 @@ def enqueue( "depends_on": depends_on, "expires": expires, "result_ttl": result_ttl, + "idempotency_key": idempotency_key, + "idempotent": idempotent, } for mw in self._global_middleware: try: @@ -448,12 +493,22 @@ def enqueue( depends_on = enqueue_options.get("depends_on") expires = enqueue_options.get("expires") result_ttl = enqueue_options.get("result_ttl") + idempotency_key = enqueue_options.get("idempotency_key") + idempotent = enqueue_options.get("idempotent") if self._interceptor is not None and not self._test_mode_active: final_args, final_kwargs = self._interceptor.intercept(final_args, final_kwargs) task_serializer = self._get_serializer(task_name) payload = task_serializer.dumps((final_args, final_kwargs)) + unique_key = self._resolve_unique_key( + task_name=task_name, + payload=payload, + unique_key=unique_key, + idempotency_key=idempotency_key, + idempotent=idempotent, + ) + dep_ids = None if depends_on is not None: dep_ids = [depends_on] if isinstance(depends_on, str) else list(depends_on) @@ -502,6 +557,8 @@ def enqueue_many( expires_list: list[float | None] | None = None, result_ttl: int | None = None, result_ttl_list: list[int | None] | None = None, + idempotency_keys: list[str | None] | None = None, + idempotent: bool | None = None, ) -> list[JobResult]: """Enqueue multiple jobs for the same task in a single transaction. @@ -515,13 +572,22 @@ def enqueue_many( timeout: Timeout in seconds for all jobs (uses default if None). delay: Uniform delay in seconds for all jobs. delay_list: Per-job delays in seconds. - unique_keys: Per-job deduplication keys. + unique_keys: Per-job deduplication keys (alias of + ``idempotency_keys`` for backwards compatibility — wins per-job + if both are set). metadata: Uniform metadata JSON string for all jobs. metadata_list: Per-job metadata JSON strings. expires: Uniform expiry in seconds for all jobs. expires_list: Per-job expiry in seconds. result_ttl: Uniform result TTL in seconds for all jobs. result_ttl_list: Per-job result TTL in seconds. + idempotency_keys: Per-job explicit dedup keys. Falls back to + auto-derivation when an entry is ``None`` and the task is + idempotent. + idempotent: Force-on (``True``) or force-off (``False``) + auto-derived dedup keys for this batch. ``None`` (the default) + falls back to the per-task setting from + ``@queue.task(idempotent=True)``. Returns: List of JobResult handles, one per enqueued job. @@ -550,6 +616,8 @@ def enqueue_many( "metadata": (metadata_list[i] if metadata_list is not None else metadata), "expires": (expires_list[i] if expires_list is not None else expires), "result_ttl": (result_ttl_list[i] if result_ttl_list is not None else result_ttl), + "idempotency_key": (idempotency_keys[i] if idempotency_keys is not None else None), + "idempotent": idempotent, } for i in range(count) ] @@ -570,7 +638,6 @@ def enqueue_many( retries_list = [opt["max_retries"] for opt in per_job_options] timeouts_list = [opt["timeout"] for opt in per_job_options] delays = [opt["delay"] for opt in per_job_options] - per_job_unique_keys = [opt["unique_key"] for opt in per_job_options] metas = [opt["metadata"] for opt in per_job_options] exp_list = [opt["expires"] for opt in per_job_options] ttl_list = [opt["result_ttl"] for opt in per_job_options] @@ -588,6 +655,17 @@ def enqueue_many( ] task_names = [task_name] * count + per_job_unique_keys = [ + self._resolve_unique_key( + task_name=task_name, + payload=payloads[i], + unique_key=per_job_options[i]["unique_key"], + idempotency_key=per_job_options[i]["idempotency_key"], + idempotent=per_job_options[i]["idempotent"], + ) + for i in range(count) + ] + py_jobs = self._inner.enqueue_batch( task_names=task_names, payloads=payloads, diff --git a/py_src/taskito/mixins/decorators.py b/py_src/taskito/mixins/decorators.py index 44d7d82..15582a0 100644 --- a/py_src/taskito/mixins/decorators.py +++ b/py_src/taskito/mixins/decorators.py @@ -46,6 +46,7 @@ class QueueDecoratorMixin: _periodic_configs: list[dict[str, Any]] _hooks: dict[str, list[Callable]] _task_serializers: dict[str, Serializer] + _task_idempotent: dict[str, bool] _task_middleware: dict[str, list[TaskMiddleware]] _task_retry_filters: dict[str, dict[str, list[type[Exception]]]] _task_inject_map: dict[str, list[str]] @@ -71,6 +72,7 @@ def task( serializer: Serializer | None = None, max_retry_delay: int | None = None, max_concurrent: int | None = None, + idempotent: bool = False, ) -> Callable[[Callable[..., Any]], TaskWrapper]: """Decorator to register a function as a background task. @@ -95,6 +97,15 @@ def task( (5 minutes) if not set. max_concurrent: Maximum number of concurrent running instances of this task. ``None`` means no limit. + idempotent: When ``True``, ``.delay()``/``.apply_async()`` calls + automatically derive a deduplication key from + ``sha256(task_name|serialized_payload)``. Two calls with + identical arguments while a job is pending or running return + the same job ID instead of producing a duplicate. The slot is + released once the job leaves the active state. Per-call + ``idempotency_key="..."`` overrides the derived key; per-call + ``idempotent=False`` disables auto-derivation for that one + submission. """ def decorator(fn: Callable) -> TaskWrapper: @@ -140,6 +151,10 @@ def decorator(fn: Callable) -> TaskWrapper: if serializer is not None: self._task_serializers[task_name] = serializer + # Store per-task idempotency flag (auto-derives unique_key on enqueue) + if idempotent: + self._task_idempotent[task_name] = True + # Store inject map for resource injection if final_inject: self._task_inject_map[task_name] = final_inject diff --git a/py_src/taskito/task.py b/py_src/taskito/task.py index 224f992..e22c6c3 100644 --- a/py_src/taskito/task.py +++ b/py_src/taskito/task.py @@ -85,6 +85,8 @@ def apply_async( depends_on: str | list[str] | None = None, expires: float | None = None, result_ttl: int | None = None, + idempotency_key: str | None = None, + idempotent: bool | None = None, ) -> JobResult: """Enqueue with full control over submission options. @@ -96,11 +98,18 @@ def apply_async( queue: Override the default queue name. max_retries: Override the default max retry count. timeout: Override the default timeout in seconds. - unique_key: Deduplication key. + unique_key: Deduplication key (alias of ``idempotency_key``). metadata: Arbitrary JSON string to attach to the job. depends_on: Job ID or list of job IDs that must complete first. expires: Seconds until the job expires (skipped if not started by then). result_ttl: Per-job result TTL in seconds. + idempotency_key: Explicit dedup key. A second submission with the + same key while the first job is pending or running returns the + existing job's ID. + idempotent: ``True`` forces auto-derivation of a key from the + serialized payload; ``False`` disables it for this call (useful + when the task is registered with ``idempotent=True`` but a + particular submission must run again). """ return self._queue.enqueue( task_name=self._task_name, @@ -116,6 +125,8 @@ def apply_async( depends_on=depends_on, expires=expires, result_ttl=result_ttl, + idempotency_key=idempotency_key, + idempotent=idempotent, ) def map(self, iterable: list[tuple]) -> list[JobResult]: diff --git a/tests/python/test_idempotent.py b/tests/python/test_idempotent.py new file mode 100644 index 0000000..fefe3ef --- /dev/null +++ b/tests/python/test_idempotent.py @@ -0,0 +1,123 @@ +"""Tests for auto-derived idempotency on @queue.task(idempotent=True).""" + +from __future__ import annotations + +import threading + +from taskito import Queue + + +def test_idempotent_task_dedupes_by_args(queue: Queue) -> None: + """Two enqueues with identical args under idempotent=True share a job.""" + + @queue.task(idempotent=True) + def charge(customer_id: int, amount: int) -> int: + return amount + + job1 = charge.delay(42, 1000) + job2 = charge.delay(42, 1000) + + assert job1.id == job2.id + + +def test_idempotent_task_distinct_args_distinct_jobs(queue: Queue) -> None: + """Different arguments produce different auto-keys → different jobs.""" + + @queue.task(idempotent=True) + def charge(customer_id: int, amount: int) -> int: + return amount + + job_a = charge.delay(1, 100) + job_b = charge.delay(2, 100) + job_c = charge.delay(1, 200) + + assert len({job_a.id, job_b.id, job_c.id}) == 3 + + +def test_idempotent_per_call_key_overrides_auto(queue: Queue) -> None: + """An explicit idempotency_key wins over the auto-derived key.""" + + @queue.task(idempotent=True) + def process(data: str) -> str: + return data + + auto_job = process.delay("payload") + explicit_job = process.apply_async(args=("payload",), idempotency_key="custom-key") + + # Auto-key and explicit-key collide on the same args only by coincidence — + # since the explicit key is "custom-key", they should be different jobs. + assert auto_job.id != explicit_job.id + + +def test_idempotent_per_call_disable_creates_new_job(queue: Queue) -> None: + """idempotent=False on apply_async overrides the per-task default.""" + + @queue.task(idempotent=True) + def process(data: str) -> str: + return data + + auto_job = process.delay("same-args") + forced_new = process.apply_async(args=("same-args",), idempotent=False) + + assert auto_job.id != forced_new.id + + +def test_non_idempotent_task_allows_duplicates(queue: Queue) -> None: + """Without idempotent=True, identical calls produce distinct jobs.""" + + @queue.task() + def process(data: str) -> str: + return data + + job1 = process.delay("payload") + job2 = process.delay("payload") + + assert job1.id != job2.id + + +def test_idempotent_clears_after_completion(queue: Queue) -> None: + """After an idempotent job finishes, the next call creates a new job.""" + + @queue.task(idempotent=True) + def fast() -> str: + return "done" + + job1 = fast.delay() + + worker = threading.Thread(target=queue.run_worker, daemon=True) + worker.start() + + job1.result(timeout=10) + + job2 = fast.delay() + assert job2.id != job1.id + + +def test_idempotent_via_enqueue_kwarg(queue: Queue) -> None: + """Per-call idempotent=True works without a registered task default.""" + + @queue.task() + def process(data: str) -> str: + return data + + job1 = process.apply_async(args=("x",), idempotent=True) + job2 = process.apply_async(args=("x",), idempotent=True) + job3 = process.apply_async(args=("y",), idempotent=True) + + assert job1.id == job2.id + assert job1.id != job3.id + + +def test_idempotent_unique_key_takes_precedence(queue: Queue) -> None: + """An explicit unique_key beats both auto-derivation and idempotency_key.""" + + @queue.task(idempotent=True) + def process(data: str) -> str: + return data + + explicit = process.apply_async(args=("payload",), unique_key="explicit-uk") + auto = process.delay("payload") + + # The auto key is "auto:" and the explicit one is "explicit-uk", + # so they must differ. + assert explicit.id != auto.id