Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions docs/content/docs/guides/reliability/idempotency.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
---
title: Idempotency
description: "Auto-derived deduplication keys — declare a task idempotent and identical calls coalesce to a single execution."
---

When a task represents an action that **must not run twice** (charging a card,
sending a webhook, applying a credit), set `idempotent=True` on the task.
Taskito derives a deduplication key from the task name and serialized arguments.
Two enqueues with identical arguments while the first job is pending or running
return the **same job ID** — the second call becomes a no-op.

## Declaring an idempotent task

```python
@queue.task(idempotent=True)
def charge_customer(customer_id: int, amount_cents: int) -> str:
return payment_provider.charge(customer_id, amount_cents)

job1 = charge_customer.delay(42, 1000)
job2 = charge_customer.delay(42, 1000)

assert job1.id == job2.id # second call coalesced into the first
```

## How the key is derived

Auto-keys follow the format `auto:<sha256(task_name|payload)[:32]>`. Hashing
happens *after* serialization, so two calls with logically equal but
type-different arguments collide only when the serializer emits identical bytes
— the same contract that the wire payload uses.

The dedup slot is held while the job is `pending` or `running`. Once the job
reaches a terminal state (`completed`, `failed`, `dead`), the slot is freed and
a fresh call with the same arguments creates a new job.

## Per-call overrides

```python
# Custom semantic key (overrides the auto-derived hash)
charge_customer.apply_async(
args=(42, 1000),
idempotency_key="invoice-2026-05-08-42",
)

# Force a fresh execution even on an idempotent task
charge_customer.apply_async(args=(42, 1000), idempotent=False)
```

| Parameter | Type | Effect |
|-----------|------|--------|
| `idempotent=True` (decorator) | `bool` | Default for every `delay()` / `apply_async()` |
| `idempotency_key="…"` (call) | `str` | Explicit key. Overrides auto-derivation. |
| `idempotent=False` (call) | `bool` | Disable auto-derivation for this submission |
| `unique_key="…"` (call) | `str` | Backwards-compat alias of `idempotency_key` |

Precedence (high to low): `unique_key` → `idempotency_key` → auto-derivation.

## Pitfalls

- **Don't pass moving values as arguments.** `datetime.now()`, `uuid4()`, and
similar will produce a different hash on every call, so dedup never triggers.
Move them inside the task body or pass a stable identifier instead.
- **Argument order matters** — `charge(42, 1000)` and `charge(amount=1000, customer_id=42)`
serialize differently and therefore hash differently. Be consistent at the
call site or use an explicit `idempotency_key`.
- **Serializer matters.** If you change the per-task serializer between
releases, hashes change and in-flight jobs from the old release won't dedupe
against new submissions. Use explicit `idempotency_key` for keys that must
survive deploys.
- **Test mode skips dedup.** `queue.test_mode()` runs tasks synchronously
without storage, so idempotency is a no-op there.

## Bulk submissions

`enqueue_many()` accepts a per-job `idempotency_keys` list, or a uniform
`idempotent=True` that auto-derives a key per row:

```python
queue.enqueue_many(
task_name="myapp.charge_customer",
args_list=[(42, 1000), (43, 2000), (42, 1000)],
idempotent=True,
)
# Third entry collides with the first → only two distinct jobs created.
```
1 change: 1 addition & 0 deletions docs/content/docs/guides/reliability/meta.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"retries",
"error-handling",
"guarantees",
"idempotency",
"rate-limiting",
"circuit-breakers",
"locking"
Expand Down
84 changes: 81 additions & 3 deletions py_src/taskito/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from __future__ import annotations

import functools
import hashlib
import logging
import os
from collections.abc import Callable
Expand Down Expand Up @@ -210,6 +211,7 @@ def __init__(
}
self._serializer: Serializer = serializer or CloudpickleSerializer()
self._task_serializers: dict[str, Serializer] = {}
self._task_idempotent: dict[str, bool] = {}
self._global_middleware: list[TaskMiddleware] = middleware or []
self._task_middleware: dict[str, list[TaskMiddleware]] = {}
self._task_retry_filters: dict[str, dict[str, list[type[Exception]]]] = {}
Expand Down Expand Up @@ -265,6 +267,37 @@ def _get_serializer(self, task_name: str) -> Serializer:
"""Get the serializer for a task (per-task or queue-level fallback)."""
return self._task_serializers.get(task_name, self._serializer)

@staticmethod
def _auto_idempotency_key(task_name: str, payload: bytes) -> str:
"""Derive a deterministic dedup key from task name + serialized payload."""
digest = hashlib.sha256(task_name.encode("utf-8") + b"|" + payload).hexdigest()
return f"auto:{digest[:32]}"

def _resolve_unique_key(
self,
task_name: str,
payload: bytes,
unique_key: str | None,
idempotency_key: str | None,
idempotent: bool | None,
) -> str | None:
"""Resolve which dedup key (if any) to send to storage.

Precedence: explicit unique_key > explicit idempotency_key > auto-key
when the per-call ``idempotent`` flag (or per-task default) is True.
A per-call ``idempotent=False`` disables auto-derivation even if the
task is registered as idempotent.
"""
if unique_key is not None:
return unique_key
if idempotency_key is not None:
return idempotency_key
if idempotent is False:
return None
if idempotent or self._task_idempotent.get(task_name, False):
return self._auto_idempotency_key(task_name, payload)
return None

def _deserialize_payload(self, task_name: str, payload: bytes) -> tuple:
"""Deserialize a job payload using the per-task or queue-level serializer."""
return self._get_serializer(task_name).loads(payload) # type: ignore[no-any-return]
Expand Down Expand Up @@ -397,6 +430,8 @@ def enqueue(
depends_on: str | list[str] | None = None,
expires: float | None = None,
result_ttl: int | None = None,
idempotency_key: str | None = None,
idempotent: bool | None = None,
) -> JobResult:
"""Enqueue a task for background execution.

Expand All @@ -409,11 +444,19 @@ def enqueue(
queue: Target queue name.
max_retries: Max retry attempts.
timeout: Timeout in seconds.
unique_key: Deduplication key.
unique_key: Deduplication key (alias of ``idempotency_key`` — wins
if both are set, kept for backwards compatibility).
metadata: Arbitrary JSON string to attach to the job.
depends_on: Job ID or list of job IDs that must complete first.
expires: Seconds until the job expires (skipped if not started by then).
result_ttl: Per-job result TTL in seconds. Overrides global result_ttl.
idempotency_key: Explicit dedup key. A second enqueue with the same
key while the first job is pending or running returns the
existing job's ID instead of creating a duplicate.
idempotent: Force-on (``True``) or force-off (``False``) the
auto-derived dedup key for this single call. ``None`` (the
default) falls back to the per-task setting from
``@queue.task(idempotent=True)``.
"""
final_args = args
final_kwargs = kwargs or {}
Expand All @@ -430,6 +473,8 @@ def enqueue(
"depends_on": depends_on,
"expires": expires,
"result_ttl": result_ttl,
"idempotency_key": idempotency_key,
"idempotent": idempotent,
}
for mw in self._global_middleware:
try:
Expand All @@ -448,12 +493,22 @@ def enqueue(
depends_on = enqueue_options.get("depends_on")
expires = enqueue_options.get("expires")
result_ttl = enqueue_options.get("result_ttl")
idempotency_key = enqueue_options.get("idempotency_key")
idempotent = enqueue_options.get("idempotent")

if self._interceptor is not None and not self._test_mode_active:
final_args, final_kwargs = self._interceptor.intercept(final_args, final_kwargs)
task_serializer = self._get_serializer(task_name)
payload = task_serializer.dumps((final_args, final_kwargs))

unique_key = self._resolve_unique_key(
task_name=task_name,
payload=payload,
unique_key=unique_key,
idempotency_key=idempotency_key,
idempotent=idempotent,
)

dep_ids = None
if depends_on is not None:
dep_ids = [depends_on] if isinstance(depends_on, str) else list(depends_on)
Expand Down Expand Up @@ -502,6 +557,8 @@ def enqueue_many(
expires_list: list[float | None] | None = None,
result_ttl: int | None = None,
result_ttl_list: list[int | None] | None = None,
idempotency_keys: list[str | None] | None = None,
idempotent: bool | None = None,
) -> list[JobResult]:
"""Enqueue multiple jobs for the same task in a single transaction.

Expand All @@ -515,13 +572,22 @@ def enqueue_many(
timeout: Timeout in seconds for all jobs (uses default if None).
delay: Uniform delay in seconds for all jobs.
delay_list: Per-job delays in seconds.
unique_keys: Per-job deduplication keys.
unique_keys: Per-job deduplication keys (alias of
``idempotency_keys`` for backwards compatibility — wins per-job
if both are set).
metadata: Uniform metadata JSON string for all jobs.
metadata_list: Per-job metadata JSON strings.
expires: Uniform expiry in seconds for all jobs.
expires_list: Per-job expiry in seconds.
result_ttl: Uniform result TTL in seconds for all jobs.
result_ttl_list: Per-job result TTL in seconds.
idempotency_keys: Per-job explicit dedup keys. Falls back to
auto-derivation when an entry is ``None`` and the task is
idempotent.
idempotent: Force-on (``True``) or force-off (``False``)
auto-derived dedup keys for this batch. ``None`` (the default)
falls back to the per-task setting from
``@queue.task(idempotent=True)``.

Returns:
List of JobResult handles, one per enqueued job.
Expand Down Expand Up @@ -550,6 +616,8 @@ def enqueue_many(
"metadata": (metadata_list[i] if metadata_list is not None else metadata),
"expires": (expires_list[i] if expires_list is not None else expires),
"result_ttl": (result_ttl_list[i] if result_ttl_list is not None else result_ttl),
"idempotency_key": (idempotency_keys[i] if idempotency_keys is not None else None),
"idempotent": idempotent,
}
for i in range(count)
]
Expand All @@ -570,7 +638,6 @@ def enqueue_many(
retries_list = [opt["max_retries"] for opt in per_job_options]
timeouts_list = [opt["timeout"] for opt in per_job_options]
delays = [opt["delay"] for opt in per_job_options]
per_job_unique_keys = [opt["unique_key"] for opt in per_job_options]
metas = [opt["metadata"] for opt in per_job_options]
exp_list = [opt["expires"] for opt in per_job_options]
ttl_list = [opt["result_ttl"] for opt in per_job_options]
Expand All @@ -588,6 +655,17 @@ def enqueue_many(
]
task_names = [task_name] * count

per_job_unique_keys = [
self._resolve_unique_key(
task_name=task_name,
payload=payloads[i],
unique_key=per_job_options[i]["unique_key"],
idempotency_key=per_job_options[i]["idempotency_key"],
idempotent=per_job_options[i]["idempotent"],
)
for i in range(count)
]

py_jobs = self._inner.enqueue_batch(
task_names=task_names,
payloads=payloads,
Expand Down
15 changes: 15 additions & 0 deletions py_src/taskito/mixins/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class QueueDecoratorMixin:
_periodic_configs: list[dict[str, Any]]
_hooks: dict[str, list[Callable]]
_task_serializers: dict[str, Serializer]
_task_idempotent: dict[str, bool]
_task_middleware: dict[str, list[TaskMiddleware]]
_task_retry_filters: dict[str, dict[str, list[type[Exception]]]]
_task_inject_map: dict[str, list[str]]
Expand All @@ -71,6 +72,7 @@ def task(
serializer: Serializer | None = None,
max_retry_delay: int | None = None,
max_concurrent: int | None = None,
idempotent: bool = False,
) -> Callable[[Callable[..., Any]], TaskWrapper]:
"""Decorator to register a function as a background task.

Expand All @@ -95,6 +97,15 @@ def task(
(5 minutes) if not set.
max_concurrent: Maximum number of concurrent running instances of
this task. ``None`` means no limit.
idempotent: When ``True``, ``.delay()``/``.apply_async()`` calls
automatically derive a deduplication key from
``sha256(task_name|serialized_payload)``. Two calls with
identical arguments while a job is pending or running return
the same job ID instead of producing a duplicate. The slot is
released once the job leaves the active state. Per-call
``idempotency_key="..."`` overrides the derived key; per-call
``idempotent=False`` disables auto-derivation for that one
submission.
"""

def decorator(fn: Callable) -> TaskWrapper:
Expand Down Expand Up @@ -140,6 +151,10 @@ def decorator(fn: Callable) -> TaskWrapper:
if serializer is not None:
self._task_serializers[task_name] = serializer

# Store per-task idempotency flag (auto-derives unique_key on enqueue)
if idempotent:
self._task_idempotent[task_name] = True

# Store inject map for resource injection
if final_inject:
self._task_inject_map[task_name] = final_inject
Expand Down
13 changes: 12 additions & 1 deletion py_src/taskito/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ def apply_async(
depends_on: str | list[str] | None = None,
expires: float | None = None,
result_ttl: int | None = None,
idempotency_key: str | None = None,
idempotent: bool | None = None,
) -> JobResult:
"""Enqueue with full control over submission options.

Expand All @@ -96,11 +98,18 @@ def apply_async(
queue: Override the default queue name.
max_retries: Override the default max retry count.
timeout: Override the default timeout in seconds.
unique_key: Deduplication key.
unique_key: Deduplication key (alias of ``idempotency_key``).
metadata: Arbitrary JSON string to attach to the job.
depends_on: Job ID or list of job IDs that must complete first.
expires: Seconds until the job expires (skipped if not started by then).
result_ttl: Per-job result TTL in seconds.
idempotency_key: Explicit dedup key. A second submission with the
same key while the first job is pending or running returns the
existing job's ID.
idempotent: ``True`` forces auto-derivation of a key from the
serialized payload; ``False`` disables it for this call (useful
when the task is registered with ``idempotent=True`` but a
particular submission must run again).
"""
return self._queue.enqueue(
task_name=self._task_name,
Expand All @@ -116,6 +125,8 @@ def apply_async(
depends_on=depends_on,
expires=expires,
result_ttl=result_ttl,
idempotency_key=idempotency_key,
idempotent=idempotent,
)

def map(self, iterable: list[tuple]) -> list[JobResult]:
Expand Down
Loading
Loading