From fb5640600e28a52823580b5f8fb99446c5ab4746 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 3 May 2026 01:45:41 +0530 Subject: [PATCH 1/2] docs(docs-next): port api-reference overview + queue subsection Eight files: api-reference overview + queue/{index,jobs,queues,workers,resources,events}. Material `:::tip` admonitions become Fumadocs Callouts; mkdocstrings `::: taskito.X.Y` autodoc directives are dropped (no Fumadocs equivalent yet); pipe characters in type unions escaped for markdown tables. --- .../content/docs/api-reference/meta.json | 12 +- .../content/docs/api-reference/overview.mdx | 17 +- .../docs/api-reference/queue/events.mdx | 76 +++++++ .../docs/api-reference/queue/index.mdx | 204 ++++++++++++++++++ .../content/docs/api-reference/queue/jobs.mdx | 140 ++++++++++++ .../docs/api-reference/queue/meta.json | 4 + .../docs/api-reference/queue/queues.mdx | 165 ++++++++++++++ .../docs/api-reference/queue/resources.mdx | 178 +++++++++++++++ .../docs/api-reference/queue/workers.mdx | 134 ++++++++++++ 9 files changed, 924 insertions(+), 6 deletions(-) create mode 100644 docs-next/content/docs/api-reference/queue/events.mdx create mode 100644 docs-next/content/docs/api-reference/queue/index.mdx create mode 100644 docs-next/content/docs/api-reference/queue/jobs.mdx create mode 100644 docs-next/content/docs/api-reference/queue/meta.json create mode 100644 docs-next/content/docs/api-reference/queue/queues.mdx create mode 100644 docs-next/content/docs/api-reference/queue/resources.mdx create mode 100644 docs-next/content/docs/api-reference/queue/workers.mdx diff --git a/docs-next/content/docs/api-reference/meta.json b/docs-next/content/docs/api-reference/meta.json index af07d0a..d54a81b 100644 --- a/docs-next/content/docs/api-reference/meta.json +++ b/docs-next/content/docs/api-reference/meta.json @@ -1,4 +1,14 @@ { "title": "API Reference", - "pages": ["overview"] + "pages": [ + "overview", + "queue", + "task", + "result", + "context", + "canvas", + "workflows", + "testing", + "cli" + ] } diff --git a/docs-next/content/docs/api-reference/overview.mdx b/docs-next/content/docs/api-reference/overview.mdx index db008ae..162254d 100644 --- a/docs-next/content/docs/api-reference/overview.mdx +++ b/docs-next/content/docs/api-reference/overview.mdx @@ -1,10 +1,17 @@ --- title: Overview -description: "Complete API surface." +description: "Complete Python API reference for all public classes and methods." --- -import { Callout } from 'fumadocs-ui/components/callout'; +Complete Python API reference for all public classes and methods. - - Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text. - +| Class | Description | +|-------|-------------| +| [Queue](/docs/api-reference/queue) | Central orchestrator — task registration, enqueue, workers, and all queue operations | +| [TaskWrapper](/docs/api-reference/task) | Handle returned by `@queue.task()` — `delay()`, `apply_async()`, `map()`, signatures | +| [JobResult](/docs/api-reference/result) | Handle for an enqueued job — status polling, result retrieval, dependencies | +| [JobContext](/docs/api-reference/context) | Runtime context inside a running task — job ID, retry count, progress updates | +| [Canvas](/docs/api-reference/canvas) | Workflow primitives — `Signature`, `chain`, `group`, `chord` | +| [Workflows](/docs/api-reference/workflows) | DAG workflow builder, `WorkflowRun`, gates, sub-workflows | +| [Testing](/docs/api-reference/testing) | Test mode, `TestResult`, `MockResource` for unit testing tasks | +| [CLI](/docs/api-reference/cli) | `taskito` command-line interface — `worker`, `info`, `scaler` | diff --git a/docs-next/content/docs/api-reference/queue/events.mdx b/docs-next/content/docs/api-reference/queue/events.mdx new file mode 100644 index 0000000..7fceb57 --- /dev/null +++ b/docs-next/content/docs/api-reference/queue/events.mdx @@ -0,0 +1,76 @@ +--- +title: Events & Logs +description: "Event callbacks, webhook delivery, and structured task logging." +--- + +Methods for event callbacks, webhook registration, and structured task logging. + +## Events & webhooks + +### `queue.on_event()` + +```python +queue.on_event(event: str) -> Callable +``` + +Register a callback for a queue event. Supported events: `job.completed`, +`job.failed`, `job.retried`, `job.dead`. + +```python +@queue.on_event("job.failed") +def handle_failure(job_id: str, task_name: str, error: str) -> None: + ... +``` + +### `queue.add_webhook()` + +```python +queue.add_webhook( + url: str, + events: list[EventType] | None = None, + headers: dict[str, str] | None = None, + secret: str | None = None, + max_retries: int = 3, + timeout: float = 10.0, + retry_backoff: float = 2.0, +) -> None +``` + +Register a webhook URL for one or more events. 4xx responses are not retried; +5xx responses are retried with exponential backoff. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `url` | `str` | — | URL to POST to. Must be `http://` or `https://`. | +| `events` | `list[EventType] \| None` | `None` | Event types to subscribe to. `None` means all events. | +| `headers` | `dict[str, str] \| None` | `None` | Extra HTTP headers to include. | +| `secret` | `str \| None` | `None` | HMAC-SHA256 signing secret for `X-Taskito-Signature`. | +| `max_retries` | `int` | `3` | Maximum delivery attempts. | +| `timeout` | `float` | `10.0` | HTTP request timeout in seconds. | +| `retry_backoff` | `float` | `2.0` | Base for exponential backoff between retries. | + +## Logs + +### `queue.task_logs()` + +```python +queue.task_logs(job_id: str, limit: int = 100) -> list[dict] +``` + +Return structured log entries emitted by `current_job.log()` during the given +job's execution. + +### `queue.query_logs()` + +```python +queue.query_logs( + task_name: str | None = None, + level: str | None = None, + message_like: str | None = None, + since: float | None = None, + limit: int = 100, + offset: int = 0, +) -> list[dict] +``` + +Query task logs across all jobs with optional filters. diff --git a/docs-next/content/docs/api-reference/queue/index.mdx b/docs-next/content/docs/api-reference/queue/index.mdx new file mode 100644 index 0000000..6b27f86 --- /dev/null +++ b/docs-next/content/docs/api-reference/queue/index.mdx @@ -0,0 +1,204 @@ +--- +title: Queue +description: "The central class for creating and managing a task queue." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +The central class for creating and managing a task queue. + + + The Queue API is split across several pages for readability: + + - **[Job Management](/docs/api-reference/queue/jobs)** — get, list, cancel, archive, replay jobs + - **[Queue & Stats](/docs/api-reference/queue/queues)** — rate limits, concurrency, pause/resume, statistics, dead letters + - **[Workers & Hooks](/docs/api-reference/queue/workers)** — run workers, lifecycle hooks, circuit breakers, async methods + - **[Resources & Locking](/docs/api-reference/queue/resources)** — resource system, distributed locks + - **[Events & Logs](/docs/api-reference/queue/events)** — event callbacks, webhooks, structured logs + + +## Constructor + +```python +Queue( + db_path: str = ".taskito/taskito.db", + workers: int = 0, + default_retry: int = 3, + default_timeout: int = 300, + default_priority: int = 0, + result_ttl: int | None = None, + middleware: list[TaskMiddleware] | None = None, + drain_timeout: int = 30, + interception: str = "off", + max_intercept_depth: int = 10, + recipe_signing_key: str | None = None, + max_reconstruction_timeout: int = 10, + file_path_allowlist: list[str] | None = None, + disabled_proxies: list[str] | None = None, + async_concurrency: int = 100, + event_workers: int = 4, + scheduler_poll_interval_ms: int = 50, + scheduler_reap_interval: int = 100, + scheduler_cleanup_interval: int = 1200, + namespace: str | None = None, +) +``` + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `db_path` | `str` | `".taskito/taskito.db"` | Path to SQLite database file. Parent directories are created automatically. | +| `workers` | `int` | `0` | Number of worker threads (`0` = auto-detect CPU count) | +| `default_retry` | `int` | `3` | Default max retry attempts for tasks | +| `default_timeout` | `int` | `300` | Default task timeout in seconds | +| `default_priority` | `int` | `0` | Default task priority (higher = more urgent) | +| `result_ttl` | `int \| None` | `None` | Auto-cleanup completed/dead jobs older than this many seconds. `None` disables. | +| `middleware` | `list[TaskMiddleware] \| None` | `None` | Queue-level middleware applied to all tasks. | +| `drain_timeout` | `int` | `30` | Seconds to wait for in-flight tasks during graceful shutdown. | +| `interception` | `str` | `"off"` | Argument interception mode: `"strict"`, `"lenient"`, or `"off"`. See [Resource System](/docs/guides/resources). | +| `max_intercept_depth` | `int` | `10` | Max recursion depth for argument walking. | +| `recipe_signing_key` | `str \| None` | `None` | HMAC-SHA256 key for proxy recipe integrity. Falls back to `TASKITO_RECIPE_SECRET` env var. | +| `max_reconstruction_timeout` | `int` | `10` | Max seconds allowed for proxy reconstruction. | +| `file_path_allowlist` | `list[str] \| None` | `None` | Allowed file path prefixes for the file proxy handler. | +| `disabled_proxies` | `list[str] \| None` | `None` | Handler names to skip when registering built-in proxy handlers. | +| `async_concurrency` | `int` | `100` | Maximum number of `async def` tasks running concurrently on the native async executor. | +| `event_workers` | `int` | `4` | Thread pool size for the event bus. Increase for high event volume. | +| `scheduler_poll_interval_ms` | `int` | `50` | Milliseconds between scheduler poll cycles. Lower values improve scheduling precision at the cost of CPU. | +| `scheduler_reap_interval` | `int` | `100` | Reap stale/timed-out jobs every N poll cycles. | +| `scheduler_cleanup_interval` | `int` | `1200` | Clean up old completed jobs every N poll cycles. | +| `namespace` | `str \| None` | `None` | Namespace for multi-tenant isolation. Jobs enqueued on this queue carry this namespace; workers only dequeue matching jobs. `None` means no namespace (default). | + +## Task registration + +### `@queue.task()` + +```python +@queue.task( + name: str | None = None, + max_retries: int = 3, + retry_backoff: float = 1.0, + retry_delays: list[float] | None = None, + max_retry_delay: int | None = None, + timeout: int = 300, + soft_timeout: float | None = None, + priority: int = 0, + rate_limit: str | None = None, + queue: str = "default", + circuit_breaker: dict | None = None, + middleware: list[TaskMiddleware] | None = None, + expires: float | None = None, + inject: list[str] | None = None, + serializer: Serializer | None = None, + max_concurrent: int | None = None, +) -> TaskWrapper +``` + +Register a function as a background task. Returns a [`TaskWrapper`](/docs/api-reference/task). + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `name` | `str \| None` | Auto-generated | Explicit task name. Defaults to `module.qualname`. | +| `max_retries` | `int` | `3` | Max retry attempts before moving to DLQ. | +| `retry_backoff` | `float` | `1.0` | Base delay in seconds for exponential backoff. | +| `retry_delays` | `list[float] \| None` | `None` | Per-attempt delays in seconds, overrides backoff. e.g. `[1, 5, 30]`. | +| `max_retry_delay` | `int \| None` | `None` | Cap on backoff delay in seconds. Defaults to 300 s. | +| `timeout` | `int` | `300` | Hard execution time limit in seconds. | +| `soft_timeout` | `float \| None` | `None` | Cooperative time limit checked via `current_job.check_timeout()`. | +| `priority` | `int` | `0` | Default priority (higher = more urgent). | +| `rate_limit` | `str \| None` | `None` | Rate limit string, e.g. `"100/m"`. | +| `queue` | `str` | `"default"` | Named queue to submit to. | +| `circuit_breaker` | `dict \| None` | `None` | Circuit breaker config: `{"threshold": 5, "window": 60, "cooldown": 120}`. | +| `middleware` | `list[TaskMiddleware] \| None` | `None` | Per-task middleware, applied in addition to queue-level middleware. | +| `expires` | `float \| None` | `None` | Seconds until the job expires if not started. | +| `inject` | `list[str] \| None` | `None` | Resource names to inject as keyword arguments. See [Resource System](/docs/guides/resources). | +| `serializer` | `Serializer \| None` | `None` | Per-task serializer override. Falls back to queue-level serializer. | +| `max_concurrent` | `int \| None` | `None` | Max concurrent running instances. `None` = no limit. | + +### `@queue.periodic()` + +```python +@queue.periodic( + cron: str, + name: str | None = None, + args: tuple = (), + kwargs: dict | None = None, + queue: str = "default", + timezone: str | None = None, +) -> TaskWrapper +``` + +Register a periodic (cron-scheduled) task. Uses 6-field cron expressions with seconds. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `cron` | `str` | — | 6-field cron expression (seconds precision). | +| `name` | `str \| None` | Auto-generated | Explicit task name. | +| `args` | `tuple` | `()` | Positional arguments passed to the task on each run. | +| `kwargs` | `dict \| None` | `None` | Keyword arguments passed to the task on each run. | +| `queue` | `str` | `"default"` | Named queue to submit to. | +| `timezone` | `str \| None` | `None` | IANA timezone name (e.g. `"America/New_York"`). Defaults to UTC. | + +## Enqueue methods + +### `queue.enqueue()` + +```python +queue.enqueue( + task_name: str, + args: tuple = (), + kwargs: dict | None = None, + priority: int | None = None, + delay: float | None = None, + queue: str | None = None, + max_retries: int | None = None, + timeout: int | None = None, + unique_key: str | None = None, + metadata: str | None = None, + depends_on: str | list[str] | None = None, +) -> JobResult +``` + +Enqueue a task for execution. Returns a [`JobResult`](/docs/api-reference/result) handle. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `depends_on` | `str \| list[str] \| None` | `None` | Job ID(s) this job depends on. See [Dependencies](/docs/guides/advanced-execution). | + +### `queue.enqueue_many()` + +```python +queue.enqueue_many( + task_name: str, + args_list: list[tuple], + kwargs_list: list[dict] | None = None, + priority: int | None = None, + queue: str | None = None, + max_retries: int | None = None, + timeout: int | None = None, + delay: float | None = None, + delay_list: list[float | None] | None = None, + unique_keys: list[str | None] | None = None, + metadata: str | None = None, + metadata_list: list[str | None] | None = None, + expires: float | None = None, + expires_list: list[float | None] | None = None, + result_ttl: int | None = None, + result_ttl_list: list[int | None] | None = None, +) -> list[JobResult] +``` + +Enqueue multiple jobs in a single transaction for high throughput. Supports both +uniform parameters (applied to all jobs) and per-job lists. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `delay` | `float \| None` | `None` | Uniform delay in seconds for all jobs | +| `delay_list` | `list[float \| None] \| None` | `None` | Per-job delays in seconds | +| `unique_keys` | `list[str \| None] \| None` | `None` | Per-job deduplication keys | +| `metadata` | `str \| None` | `None` | Uniform metadata JSON for all jobs | +| `metadata_list` | `list[str \| None] \| None` | `None` | Per-job metadata JSON | +| `expires` | `float \| None` | `None` | Uniform expiry in seconds for all jobs | +| `expires_list` | `list[float \| None] \| None` | `None` | Per-job expiry in seconds | +| `result_ttl` | `int \| None` | `None` | Uniform result TTL in seconds | +| `result_ttl_list` | `list[int \| None] \| None` | `None` | Per-job result TTL in seconds | + +Per-job lists (`*_list`) take precedence over uniform values when both are provided. diff --git a/docs-next/content/docs/api-reference/queue/jobs.mdx b/docs-next/content/docs/api-reference/queue/jobs.mdx new file mode 100644 index 0000000..cbd22d5 --- /dev/null +++ b/docs-next/content/docs/api-reference/queue/jobs.mdx @@ -0,0 +1,140 @@ +--- +title: Job Management +description: "Methods for retrieving, filtering, cancelling, archiving, and replaying jobs." +--- + +Methods for retrieving, filtering, cancelling, archiving, and replaying jobs. + +## Job retrieval + +### `queue.get_job()` + +```python +queue.get_job(job_id: str) -> JobResult | None +``` + +Retrieve a job by ID. Returns `None` if not found. + +### `queue.list_jobs()` + +```python +queue.list_jobs( + status: str | None = None, + queue: str | None = None, + task_name: str | None = None, + limit: int = 50, + offset: int = 0, + namespace: str | None = _UNSET, +) -> list[JobResult] +``` + +List jobs with optional filters. Returns newest first. Defaults to the queue's +namespace — pass `namespace=None` to see all namespaces. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `status` | `str \| None` | `None` | Filter by status: `pending`, `running`, `completed`, `failed`, `dead`, `cancelled` | +| `queue` | `str \| None` | `None` | Filter by queue name | +| `task_name` | `str \| None` | `None` | Filter by task name | +| `limit` | `int` | `50` | Maximum results to return | +| `offset` | `int` | `0` | Pagination offset | + +### `queue.list_jobs_filtered()` + +```python +queue.list_jobs_filtered( + status: str | None = None, + queue: str | None = None, + task_name: str | None = None, + metadata_like: str | None = None, + error_like: str | None = None, + created_after: float | None = None, + created_before: float | None = None, + limit: int = 50, + offset: int = 0, + namespace: str | None = _UNSET, +) -> list[JobResult] +``` + +Extended filtering with metadata and error pattern matching and time range +constraints. Defaults to the queue's namespace — pass `namespace=None` to see +all namespaces. + +## Job operations + +### `queue.cancel_job()` + +```python +queue.cancel_job(job_id: str) -> bool +``` + +Cancel a pending job. Returns `True` if cancelled, `False` if not pending. +Cascade-cancels dependents. + +### `queue.update_progress()` + +```python +queue.update_progress(job_id: str, progress: int) -> None +``` + +Update progress for a running job (0–100). + +### `queue.job_errors()` + +```python +queue.job_errors(job_id: str) -> list[dict] +``` + +Get error history for a job. Returns a list of dicts with `id`, `job_id`, +`attempt`, `error`, `failed_at`. + +### `queue.job_dag()` + +```python +queue.job_dag(job_id: str) -> dict +``` + +Return a dependency graph for a job, including all ancestors and descendants. +Useful for visualizing workflow chains. + +## Archival + +### `queue.archive()` + +```python +queue.archive(job_id: str) -> None +``` + +Move a completed or failed job to the archive for long-term retention. + +### `queue.list_archived()` + +```python +queue.list_archived( + task_name: str | None = None, + limit: int = 50, + offset: int = 0, +) -> list[dict] +``` + +List archived jobs with optional task name filter. + +## Replay + +### `queue.replay()` + +```python +queue.replay(job_id: str) -> JobResult +``` + +Re-enqueue a completed or failed job with its original arguments. Returns the +new job handle. + +### `queue.replay_history()` + +```python +queue.replay_history(job_id: str) -> list[dict] +``` + +Return the replay log for a job — every time it has been replayed and the +resulting new job IDs. diff --git a/docs-next/content/docs/api-reference/queue/meta.json b/docs-next/content/docs/api-reference/queue/meta.json new file mode 100644 index 0000000..6efa78f --- /dev/null +++ b/docs-next/content/docs/api-reference/queue/meta.json @@ -0,0 +1,4 @@ +{ + "title": "Queue", + "pages": ["index", "jobs", "queues", "workers", "resources", "events"] +} diff --git a/docs-next/content/docs/api-reference/queue/queues.mdx b/docs-next/content/docs/api-reference/queue/queues.mdx new file mode 100644 index 0000000..0c8be07 --- /dev/null +++ b/docs-next/content/docs/api-reference/queue/queues.mdx @@ -0,0 +1,165 @@ +--- +title: Queue & Stats +description: "Queue management, statistics, and dead letter operations." +--- + +Methods for managing queues, collecting statistics, and handling dead letters. + +## Queue management + +### `queue.set_queue_rate_limit()` + +```python +queue.set_queue_rate_limit(queue_name: str, rate_limit: str) -> None +``` + +Set a rate limit for all jobs in a queue. Checked by the scheduler before +per-task rate limits. + +| Parameter | Type | Description | +|---|---|---| +| `queue_name` | `str` | Queue name (e.g. `"default"`). | +| `rate_limit` | `str` | Rate limit string: `"N/s"`, `"N/m"`, or `"N/h"`. | + +### `queue.set_queue_concurrency()` + +```python +queue.set_queue_concurrency(queue_name: str, max_concurrent: int) -> None +``` + +Set a maximum number of concurrently running jobs for a queue across all workers. +Checked by the scheduler before per-task `max_concurrent` limits. + +| Parameter | Type | Description | +|---|---|---| +| `queue_name` | `str` | Queue name (e.g. `"default"`). | +| `max_concurrent` | `int` | Maximum simultaneous running jobs from this queue. | + +### `queue.pause()` + +```python +queue.pause(queue_name: str) -> None +``` + +Pause a named queue. Workers continue running but skip jobs in this queue until +it is resumed. + +### `queue.resume()` + +```python +queue.resume(queue_name: str) -> None +``` + +Resume a previously paused queue. + +### `queue.paused_queues()` + +```python +queue.paused_queues() -> list[str] +``` + +Return the names of all currently paused queues. + +### `queue.purge()` + +```python +queue.purge( + queue: str | None = None, + task_name: str | None = None, + status: str | None = None, +) -> int +``` + +Delete jobs matching the given filters. Returns the count deleted. + +### `queue.revoke_task()` + +```python +queue.revoke_task(task_name: str) -> None +``` + +Prevent all future enqueues of the given task name. Existing pending jobs are +not affected. + +## Statistics + +### `queue.stats()` + +```python +queue.stats() -> dict[str, int] +``` + +Returns `{"pending": N, "running": N, "completed": N, "failed": N, "dead": N, "cancelled": N}`. + +### `queue.stats_by_queue()` + +```python +queue.stats_by_queue() -> dict[str, dict[str, int]] +``` + +Returns per-queue status counts: `{queue_name: {"pending": N, ...}}`. + +### `queue.stats_all_queues()` + +```python +queue.stats_all_queues() -> dict[str, dict[str, int]] +``` + +Returns stats for all queues including those with zero jobs. + +### `queue.metrics()` + +```python +queue.metrics() -> dict +``` + +Returns current throughput and latency snapshot. + +### `queue.metrics_timeseries()` + +```python +queue.metrics_timeseries( + window: int = 3600, + bucket: int = 60, +) -> list[dict] +``` + +Returns historical metrics bucketed by time. `window` is the lookback period in +seconds; `bucket` is the bucket size in seconds. + +## Dead letter queue + +### `queue.dead_letters()` + +```python +queue.dead_letters(limit: int = 10, offset: int = 0) -> list[dict] +``` + +List dead letter entries. Each dict contains: `id`, `original_job_id`, `queue`, +`task_name`, `error`, `retry_count`, `failed_at`, `metadata`. + +### `queue.retry_dead()` + +```python +queue.retry_dead(dead_id: str) -> str +``` + +Re-enqueue a dead letter job. Returns the new job ID. + +### `queue.purge_dead()` + +```python +queue.purge_dead(older_than: int = 86400) -> int +``` + +Purge dead letter entries older than `older_than` seconds. Returns count deleted. + +## Cleanup + +### `queue.purge_completed()` + +```python +queue.purge_completed(older_than: int = 86400) -> int +``` + +Purge completed jobs older than `older_than` seconds. Returns count deleted. diff --git a/docs-next/content/docs/api-reference/queue/resources.mdx b/docs-next/content/docs/api-reference/queue/resources.mdx new file mode 100644 index 0000000..b7a9b2d --- /dev/null +++ b/docs-next/content/docs/api-reference/queue/resources.mdx @@ -0,0 +1,178 @@ +--- +title: Resources & Locking +description: "Worker resource system, type registration, and distributed locking." +--- + +Methods for the worker resource system and distributed locking. + +## Resource system + +### `@queue.worker_resource()` + +```python +@queue.worker_resource( + name: str, + depends_on: list[str] | None = None, + teardown: Callable | None = None, + health_check: Callable | None = None, + health_check_interval: float = 0.0, + max_recreation_attempts: int = 3, + scope: str = "worker", + pool_size: int | None = None, + pool_min: int = 0, + acquire_timeout: float = 10.0, + max_lifetime: float = 3600.0, + idle_timeout: float = 300.0, + reloadable: bool = False, + frozen: bool = False, +) -> Callable +``` + +Decorator to register a resource factory initialized at worker startup. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `name` | `str` | — | Resource name used in `inject=["name"]` or `Inject["name"]`. | +| `depends_on` | `list[str] \| None` | `None` | Names of resources this one depends on. | +| `teardown` | `Callable \| None` | `None` | Called with the resource instance on shutdown. | +| `health_check` | `Callable \| None` | `None` | Called periodically; returns truthy if healthy. | +| `health_check_interval` | `float` | `0.0` | Seconds between health checks (0 = disabled). | +| `max_recreation_attempts` | `int` | `3` | Max times to recreate on health failure. | +| `scope` | `str` | `"worker"` | Lifetime scope: `"worker"`, `"task"`, `"thread"`, or `"request"`. | +| `pool_size` | `int \| None` | `None` | Pool capacity for task-scoped resources (default = worker thread count). | +| `pool_min` | `int` | `0` | Pre-warmed instances for task-scoped resources. | +| `acquire_timeout` | `float` | `10.0` | Max seconds to wait for a pool instance. | +| `max_lifetime` | `float` | `3600.0` | Max seconds a pooled instance can live. | +| `idle_timeout` | `float` | `300.0` | Max idle seconds before pool eviction. | +| `reloadable` | `bool` | `False` | Allow hot-reload via SIGHUP. | +| `frozen` | `bool` | `False` | Wrap in a read-only proxy that blocks attribute writes. | + +### `queue.register_resource()` + +```python +queue.register_resource(definition: ResourceDefinition) -> None +``` + +Programmatically register a `ResourceDefinition`. Equivalent to +`@queue.worker_resource()` but accepts a pre-built definition object. + +### `queue.load_resources()` + +```python +queue.load_resources(toml_path: str) -> None +``` + +Load resource definitions from a TOML file. Must be called before `run_worker()`. +See [TOML configuration](/docs/guides/resources). + +### `queue.health_check()` + +```python +queue.health_check(name: str) -> bool +``` + +Run a resource's health check immediately. Returns `True` if healthy, `False` +otherwise or if the runtime is not initialized. + +### `queue.resource_status()` + +```python +queue.resource_status() -> list[dict] +``` + +Return per-resource status. Each entry contains: `name`, `scope`, `health`, +`init_duration_ms`, `recreations`, `depends_on`. Task-scoped entries also +include `pool` stats. + +### `queue.register_type()` + +```python +queue.register_type( + python_type: type, + strategy: str, + *, + resource: str | None = None, + message: str | None = None, + converter: Callable | None = None, + type_key: str | None = None, + proxy_handler: str | None = None, +) -> None +``` + +Register a custom type with the interception system. Requires +`interception="strict"` or `"lenient"`. + +| Parameter | Type | Description | +|---|---|---| +| `python_type` | `type` | The type to register. | +| `strategy` | `str` | `"pass"`, `"convert"`, `"redirect"`, `"reject"`, or `"proxy"`. | +| `resource` | `str \| None` | Resource name for `"redirect"` strategy. | +| `message` | `str \| None` | Rejection reason for `"reject"` strategy. | +| `converter` | `Callable \| None` | Converter callable for `"convert"` strategy. | +| `type_key` | `str \| None` | Dispatch key for the converter reconstructor. | +| `proxy_handler` | `str \| None` | Handler name for `"proxy"` strategy. | + +### `queue.interception_stats()` + +```python +queue.interception_stats() -> dict +``` + +Return interception metrics: total call count, per-strategy counts, average +duration in ms, max depth reached. Returns an empty dict if interception is +disabled. + +### `queue.proxy_stats()` + +```python +queue.proxy_stats() -> list[dict] +``` + +Return per-handler proxy metrics: handler name, deconstruction count, +reconstruction count, error count, average reconstruction time in ms. + +## Distributed locking + +### `queue.lock()` + +```python +queue.lock( + name: str, + ttl: int = 30, + auto_extend: bool = True, + owner_id: str | None = None, + timeout: float | None = None, + retry_interval: float = 0.1, +) -> contextlib.AbstractContextManager +``` + +Acquire a distributed lock. Use as a context manager: + +```python +with queue.lock("my-resource", ttl=60): + # exclusive section + ... +``` + +Raises `LockNotAcquired` if acquisition fails (when `timeout` is `None` or expires). + +### `queue.alock()` + +```python +queue.alock( + name: str, + ttl: float = 30.0, + auto_extend: bool = True, + owner_id: str | None = None, + timeout: float | None = None, + retry_interval: float = 0.1, +) -> AsyncDistributedLock +``` + +Async context manager version of `lock()`. Returns an `AsyncDistributedLock` +directly — use `async with`, not `await`: + +```python +async with queue.alock("my-resource"): + ... +``` diff --git a/docs-next/content/docs/api-reference/queue/workers.mdx b/docs-next/content/docs/api-reference/queue/workers.mdx new file mode 100644 index 0000000..2bd1784 --- /dev/null +++ b/docs-next/content/docs/api-reference/queue/workers.mdx @@ -0,0 +1,134 @@ +--- +title: Workers & Hooks +description: "Worker lifecycle, hooks, circuit breakers, and the sync/async method mapping." +--- + +Methods for running workers, lifecycle hooks, circuit breakers, and the +sync/async method mapping. + +## Workers + +### `queue.run_worker()` + +```python +queue.run_worker( + queues: Sequence[str] | None = None, + tags: list[str] | None = None, + pool: str = "thread", + app: str | None = None, +) -> None +``` + +Start the worker loop. **Blocks** until interrupted. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `queues` | `Sequence[str] \| None` | `None` | Queue names to consume from. `None` = all. | +| `tags` | `list[str] \| None` | `None` | Tags for worker specialization / routing. | +| `pool` | `str` | `"thread"` | Worker pool type: `"thread"` or `"prefork"`. | +| `app` | `str \| None` | `None` | Import path to Queue (e.g. `"myapp:queue"`). Required when `pool="prefork"`. | + +### `queue.workers()` + +```python +queue.workers() -> list[dict] +``` + +Return live state of all registered workers. Each dict contains: + +| Key | Type | Description | +|-----|------|-------------| +| `worker_id` | `str` | Unique worker ID | +| `hostname` | `str` | OS hostname | +| `pid` | `int` | Process ID | +| `status` | `str` | `"active"` or `"draining"` | +| `pool_type` | `str` | `"thread"`, `"prefork"`, or `"native-async"` | +| `started_at` | `int` | Registration timestamp (ms since epoch) | +| `last_heartbeat` | `int` | Last heartbeat timestamp (ms) | +| `queues` | `str` | Comma-separated queue names | +| `threads` | `int` | Worker thread/process count | +| `tags` | `str \| None` | Worker specialization tags | +| `resources` | `str \| None` | Registered resource names (JSON) | +| `resource_health` | `str \| None` | Per-resource health status (JSON) | + +### `await queue.aworkers()` + +```python +await queue.aworkers() -> list[dict] +``` + +Async version of `workers()`. + +### `queue.arun_worker()` + +```python +await queue.arun_worker( + queues: Sequence[str] | None = None, + tags: list[str] | None = None, + pool: str = "thread", + app: str | None = None, +) -> None +``` + +Async version of `run_worker()`. Runs the blocking worker loop in a thread +executor so it does not block the asyncio event loop. Accepts the same `pool` +and `app` kwargs as the sync variant (`app` is required when `pool="prefork"`). + +## Circuit breakers + +### `queue.circuit_breakers()` + +```python +queue.circuit_breakers() -> list[dict] +``` + +Return current state of all circuit breakers: task name, state +(`closed`/`open`/`half-open`), failure count, last failure time. + +## Hooks + +### `@queue.before_task` + +```python +@queue.before_task +def hook(task_name: str, args: tuple, kwargs: dict) -> None: ... +``` + +### `@queue.after_task` + +```python +@queue.after_task +def hook(task_name: str, args: tuple, kwargs: dict, result: Any, error: Exception | None) -> None: ... +``` + +### `@queue.on_success` + +```python +@queue.on_success +def hook(task_name: str, args: tuple, kwargs: dict, result: Any) -> None: ... +``` + +### `@queue.on_failure` + +```python +@queue.on_failure +def hook(task_name: str, args: tuple, kwargs: dict, error: Exception) -> None: ... +``` + +## Async methods + +| Sync | Async | +|---|---| +| `queue.stats()` | `await queue.astats()` | +| `queue.stats_by_queue()` | `await queue.astats_by_queue()` | +| `queue.stats_all_queues()` | `await queue.astats_all_queues()` | +| `queue.dead_letters()` | `await queue.adead_letters()` | +| `queue.retry_dead()` | `await queue.aretry_dead()` | +| `queue.cancel_job()` | `await queue.acancel_job()` | +| `queue.run_worker()` | `await queue.arun_worker()` | +| `queue.metrics()` | `await queue.ametrics()` | +| `queue.workers()` | `await queue.aworkers()` | +| `queue.circuit_breakers()` | `await queue.acircuit_breakers()` | +| `queue.replay()` | `await queue.areplay()` | +| `queue.lock()` | `queue.alock()` (async context manager, not a coroutine) | +| `queue.resource_status()` | `await queue.aresource_status()` | From f2abecef7fac0f93c68a77bbe229b6276522a062 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 3 May 2026 01:45:47 +0530 Subject: [PATCH 2/2] docs(docs-next): port api-reference standalone classes Seven pages: task, result, context, canvas, workflows, testing, cli. Canvas mermaid diagrams use the existing component; warning admonitions become Fumadocs Callouts. --- .../content/docs/api-reference/canvas.mdx | 354 ++++++++++++++++++ docs-next/content/docs/api-reference/cli.mdx | 175 +++++++++ .../content/docs/api-reference/context.mdx | 158 ++++++++ .../content/docs/api-reference/result.mdx | 224 +++++++++++ docs-next/content/docs/api-reference/task.mdx | 140 +++++++ .../content/docs/api-reference/testing.mdx | 166 ++++++++ .../content/docs/api-reference/workflows.mdx | 301 +++++++++++++++ 7 files changed, 1518 insertions(+) create mode 100644 docs-next/content/docs/api-reference/canvas.mdx create mode 100644 docs-next/content/docs/api-reference/cli.mdx create mode 100644 docs-next/content/docs/api-reference/context.mdx create mode 100644 docs-next/content/docs/api-reference/result.mdx create mode 100644 docs-next/content/docs/api-reference/task.mdx create mode 100644 docs-next/content/docs/api-reference/testing.mdx create mode 100644 docs-next/content/docs/api-reference/workflows.mdx diff --git a/docs-next/content/docs/api-reference/canvas.mdx b/docs-next/content/docs/api-reference/canvas.mdx new file mode 100644 index 0000000..3e984d1 --- /dev/null +++ b/docs-next/content/docs/api-reference/canvas.mdx @@ -0,0 +1,354 @@ +--- +title: Canvas +description: "Workflow primitives — Signature, chain, group, chord, chunks, starmap." +--- + +Canvas primitives for composing task workflows. Import directly from the package: + +```python +from taskito import chain, group, chord, chunks, starmap +``` + +--- + +## Signature + +A frozen task call spec — describes *what* to call and *with what arguments*, +without executing it. + +### Creating signatures + +```python +# Mutable signature — receives previous result in chains +sig = add.s(2, 3) + +# Immutable signature — ignores previous result in chains +sig = add.si(2, 3) +``` + +### Fields + +| Field | Type | Description | +|---|---|---| +| `task` | `TaskWrapper` | The task to call | +| `args` | `tuple` | Positional arguments | +| `kwargs` | `dict` | Keyword arguments | +| `options` | `dict` | Enqueue options (priority, queue, etc.) | +| `immutable` | `bool` | If `True`, ignores previous result in chains | + +### `sig.apply()` + +```python +sig.apply(queue: Queue | None = None) -> JobResult +``` + +Enqueue this signature immediately. If `queue` is `None`, uses the task's +parent queue. + +```python +sig = add.s(2, 3) +job = sig.apply() +print(job.result(timeout=10)) # 5 +``` + +### `await sig.apply_async()` + +```python +await sig.apply_async(queue: Queue | None = None) -> JobResult +``` + +Async version of `apply()`. Safe to call from async contexts (FastAPI handlers, +etc.). + +### Mutable vs immutable + +In a [`chain`](#chain), the previous task's return value is **prepended** to a +mutable signature's args: + +```python +# add.s(10) in a chain where previous step returned 5: +# → add(5, 10) = 15 + +# add.si(2, 3) in a chain: +# → add(2, 3) = 5 (always, regardless of previous result) +``` + +--- + +## chain + +Execute signatures sequentially, piping each result to the next. + +### Constructor + +```python +chain(*signatures: Signature) +``` + +Requires at least one signature. + +### `chain.apply()` + +```python +chain.apply(queue: Queue | None = None) -> JobResult +``` + +Execute the chain by enqueuing and waiting for each step sequentially. Returns +the [`JobResult`](/docs/api-reference/result) of the **last** step. + +Each step's return value is prepended to the next mutable signature's args. +Immutable signatures (`task.si()`) receive their args as-is. + +```python +@queue.task() +def double(x): + return x * 2 + +@queue.task() +def add_ten(x): + return x + 10 + +# double(3) → 6, then add_ten(6) → 16 +result = chain(double.s(3), add_ten.s()).apply() +print(result.result(timeout=10)) # 16 +``` + +|"6"| B["add_ten(6)"] + B -->|"16"| C["Result: 16"]`} +/> + +### `await chain.apply_async()` + +```python +await chain.apply_async(queue: Queue | None = None) -> JobResult +``` + +Async version of `apply()`. Awaits each step's result using `aresult()` +instead of blocking with `result()`. Safe to call from async contexts. + +```python +result = await chain(double.s(3), add_ten.s()).apply_async() +value = await result.aresult(timeout=10) # 16 +``` + +--- + +## group + +Execute signatures in parallel and collect all results. + +### Constructor + +```python +group(*signatures: Signature) +``` + +Requires at least one signature. + +### `group.apply()` + +```python +group.apply(queue: Queue | None = None) -> list[JobResult] +``` + +Enqueue all signatures and return a list of +[`JobResult`](/docs/api-reference/result) handles. Jobs run concurrently +across available workers. + +```python +jobs = group( + add.s(1, 2), + add.s(3, 4), + add.s(5, 6), +).apply() + +results = [j.result(timeout=10) for j in jobs] +print(results) # [3, 7, 11] +``` + + D["Results: [3, 7, 11]"] + B["add(3,4)"] --> D + C["add(5,6)"] --> D`} +/> + +### `await group.apply_async()` + +```python +await group.apply_async(queue: Queue | None = None) -> list[JobResult] +``` + +Async version of `apply()`. With `max_concurrency`, uses `asyncio.gather` to +await each wave concurrently instead of blocking. + +--- + +## chord + +Run a group in parallel, then pass all results to a callback. + +### Constructor + +```python +chord(group_: group, callback: Signature) +``` + +| Parameter | Type | Description | +|---|---|---| +| `group_` | `group` | The group of tasks to run in parallel | +| `callback` | `Signature` | The task to call with all collected results | + +### `chord.apply()` + +```python +chord.apply(queue: Queue | None = None) -> JobResult +``` + +Execute the group, wait for all results, then run the callback with the list +of results prepended to its args (unless immutable). Returns the +[`JobResult`](/docs/api-reference/result) of the callback. + +```python +@queue.task() +def fetch(url): + return requests.get(url).text + +@queue.task() +def merge(results): + return "\n".join(results) + +result = chord( + group(fetch.s("https://a.com"), fetch.s("https://b.com")), + merge.s(), +).apply() + +combined = result.result(timeout=30) +``` + + C["merge([...])"] + B["fetch(b.com)"] --> C + C --> D["Combined result"]`} +/> + +### `await chord.apply_async()` + +```python +await chord.apply_async(queue: Queue | None = None) -> JobResult +``` + +Async version of `apply()`. Awaits all group results using `asyncio.gather`, +then enqueues the callback. + +--- + +## chunks + +Split an iterable into fixed-size chunks and process each chunk in parallel. + +### Constructor + +```python +chunks(task: TaskWrapper, items: list, chunk_size: int) -> group +``` + +| Parameter | Type | Description | +|---|---|---| +| `task` | `TaskWrapper` | The task to call for each chunk. Receives the chunk as a single positional argument. | +| `items` | `list` | The full list to split. Must be non-empty. | +| `chunk_size` | `int` | Items per chunk. Must be positive. | + +Returns a [`group`](#group) of signatures — one per chunk. Apply it the same +way as any other group. + +```python +@queue.task() +def process_batch(records): + return [transform(r) for r in records] + +records = load_records() # 10_000 items +jobs = chunks(process_batch, records, 100).apply() # → 100 parallel jobs + +results = [j.result(timeout=60) for j in jobs] +``` + +Raises `ValueError` if `chunk_size <= 0` or `items` is empty. + +--- + +## starmap + +Spread an iterable of argument tuples over parallel task invocations. + +### Constructor + +```python +starmap(task: TaskWrapper, args_list: list[tuple]) -> group +``` + +| Parameter | Type | Description | +|---|---|---| +| `task` | `TaskWrapper` | The task to call. Each tuple is unpacked into positional args. | +| `args_list` | `list[tuple]` | One tuple per invocation. Must be non-empty. | + +Returns a [`group`](#group) of signatures — one per tuple. Equivalent to +`group(task.s(*a) for a in args_list)`. + +```python +@queue.task() +def add(x, y): + return x + y + +jobs = starmap(add, [(1, 2), (3, 4), (5, 6)]).apply() +results = [j.result(timeout=10) for j in jobs] # [3, 7, 11] +``` + +Raises `ValueError` if `args_list` is empty. + +--- + +## Complete example + +An ETL pipeline using all three primitives: + +```python +from taskito import Queue, chain, group, chord + +queue = Queue() + +@queue.task() +def extract(source): + return load_data(source) + +@queue.task() +def transform(data): + return clean(data) + +@queue.task() +def aggregate(results): + return merge_datasets(results) + +@queue.task() +def load(data): + save_to_warehouse(data) + +# Extract from 3 sources in parallel, transform each, +# aggregate all results, then load +pipeline = chain( + chord( + group( + chain(extract.s("db"), transform.s()), + chain(extract.s("api"), transform.s()), + chain(extract.s("csv"), transform.s()), + ), + aggregate.s(), + ), + load.s(), +) + +result = pipeline.apply(queue) +``` diff --git a/docs-next/content/docs/api-reference/cli.mdx b/docs-next/content/docs/api-reference/cli.mdx new file mode 100644 index 0000000..bd68f64 --- /dev/null +++ b/docs-next/content/docs/api-reference/cli.mdx @@ -0,0 +1,175 @@ +--- +title: CLI Reference +description: "The taskito command-line interface — worker, info, scaler." +--- + +taskito provides a command-line interface for running workers and inspecting +queue state. + +## Installation + +The CLI is installed automatically with the package: + +```bash +pip install taskito +``` + +The `taskito` command becomes available in your `PATH`. + +## Commands + +### `taskito worker` + +Start a worker process that consumes and executes tasks. + +```bash +taskito worker --app [--queues ] +``` + +| Flag | Required | Description | +|---|---|---| +| `--app` | Yes | Python path to the `Queue` instance in `module:attribute` format | +| `--queues` | No | Comma-separated list of queues to process. Default: all registered queues | +| `--pool` | No | Worker pool type: `thread` (default) or `prefork` | + +**Examples:** + +```bash +# Start a worker using the queue defined in myapp/tasks.py +taskito worker --app myapp.tasks:queue + +# Only process the "emails" and "reports" queues +taskito worker --app myapp.tasks:queue --queues emails,reports + +# Use a nested module path +taskito worker --app myproject.workers.tasks:task_queue + +# Use the prefork pool +taskito worker --app myapp.tasks:queue --pool prefork +``` + +The worker blocks until interrupted with `Ctrl+C`. It performs a graceful +shutdown — in-flight tasks are allowed to complete before the process exits. + +### `taskito info` + +Display queue statistics. + +```bash +taskito info --app [--watch] +``` + +| Flag | Required | Description | +|---|---|---| +| `--app` | Yes | Python path to the `Queue` instance | +| `--watch` | No | Continuously refresh stats every 2 seconds | + +**Examples:** + +```bash +# Show stats once +taskito info --app myapp.tasks:queue +``` + +Output: + +``` +taskito queue statistics +------------------------------ + pending 12 + running 4 + completed 1847 + failed 0 + dead 2 + cancelled 0 +------------------------------ + total 1865 +``` + +```bash +# Live monitoring with throughput +taskito info --app myapp.tasks:queue --watch +``` + +Output (refreshes every 2s): + +``` +taskito queue statistics +------------------------------ + pending 3 + running 8 + completed 2104 + failed 0 + dead 2 + cancelled 0 +------------------------------ + total 2117 + + throughput 12.5 jobs/s + +Refreshing every 2s... (Ctrl+C to stop) +``` + +## App path format + +The `--app` flag uses `module:attribute` format: + +``` +myapp.tasks:queue +│ │ +│ └── attribute name (the Queue variable) +└── Python module path (dotted, importable) +``` + +The module must be importable from the current working directory. If your +module is in a package, make sure the package is installed or the parent +directory is in `PYTHONPATH`. + +**Common patterns:** + +| App structure | `--app` value | +|---|---| +| `tasks.py` with `queue = Queue()` | `tasks:queue` | +| `myapp/tasks.py` with `queue = Queue()` | `myapp.tasks:queue` | +| `src/workers/q.py` with `app = Queue()` | `src.workers.q:app` | + +### `taskito scaler` + +Start a lightweight KEDA metrics server. + +```bash +taskito scaler --app [--host ] [--port ] [--target-queue-depth ] +``` + +| Flag | Default | Description | +|---|---|---| +| `--app` | — | Python path to the `Queue` instance | +| `--host` | `0.0.0.0` | Bind address | +| `--port` | `9091` | Bind port | +| `--target-queue-depth` | `10` | Scaling target hint returned to KEDA in `/api/scaler` responses | + +The server exposes three routes: + +| Route | Description | +|---|---| +| `GET /api/scaler` | Queue depth and target for KEDA `metrics-api` trigger. Add `?queue=` to filter. | +| `GET /metrics` | Prometheus text format (requires `prometheus-client`). | +| `GET /health` | Liveness probe — always returns `{"status": "ok"}`. | + +**Example:** + +```bash +taskito scaler --app myapp:queue --port 9091 --target-queue-depth 5 +``` + +See the [KEDA Integration guide](/docs/guides/operations) for Kubernetes deploy +templates. + +## Error messages + +| Error | Cause | +|---|---| +| `--app must be in 'module:attribute' format` | Missing `:` separator | +| `could not import module '...'` | Module not found or import error | +| `module '...' has no attribute '...'` | Attribute doesn't exist on the module | +| `'...' is not a Queue instance` | The attribute exists but isn't a `Queue` | diff --git a/docs-next/content/docs/api-reference/context.mdx b/docs-next/content/docs/api-reference/context.mdx new file mode 100644 index 0000000..a35f386 --- /dev/null +++ b/docs-next/content/docs/api-reference/context.mdx @@ -0,0 +1,158 @@ +--- +title: Job Context +description: "Per-job context for the currently executing task — id, retry count, progress, publish." +--- + +import { Callout } from "fumadocs-ui/components/callout"; + +Per-job context for the currently executing task. Provides access to job +metadata and controls from inside a running task. + +## Usage + +```python +from taskito.context import current_job + +# or directly: +from taskito import current_job +``` + +`current_job` is a module-level singleton. It works in both sync and async tasks: + +- **Sync tasks** — reads from `threading.local`, isolated per worker thread. +- **Async tasks** — reads from a `contextvars.ContextVar`, isolated per + concurrent coroutine even when multiple async tasks run on the same event loop. + + + `current_job` can only be used inside a running task. Accessing it outside a + task raises `RuntimeError`. + + +## Properties + +### `current_job.id` + +```python +current_job.id -> str +``` + +The unique ID of the currently executing job. + +```python +@queue.task() +def process(data): + print(f"Running as job {current_job.id}") + ... +``` + +### `current_job.task_name` + +```python +current_job.task_name -> str +``` + +The registered name of the currently executing task. + +### `current_job.retry_count` + +```python +current_job.retry_count -> int +``` + +How many times this job has been retried. `0` on the first attempt. + +```python +@queue.task(max_retries=3) +def flaky_task(): + if current_job.retry_count > 0: + print(f"Retry attempt #{current_job.retry_count}") + call_external_api() +``` + +### `current_job.queue_name` + +```python +current_job.queue_name -> str +``` + +The name of the queue this job is running on. + +## Methods + +### `current_job.update_progress()` + +```python +current_job.update_progress(progress: int) -> None +``` + +Update the job's progress percentage (0–100). The value is written directly to +the database and can be read via [`job.progress`](/docs/api-reference/result) +or [`queue.get_job()`](/docs/api-reference/queue/jobs). + +```python +@queue.task() +def process_files(file_list): + for i, path in enumerate(file_list): + handle(path) + current_job.update_progress(int((i + 1) / len(file_list) * 100)) +``` + +Read progress from the caller: + +```python +job = process_files.delay(files) + +# Poll progress +import time +while job.status == "running": + print(f"Progress: {job.progress}%") + time.sleep(1) +``` + +### `current_job.publish()` + +```python +current_job.publish(data: Any) -> None +``` + +Publish a partial result visible to +[`job.stream()`](/docs/api-reference/result) consumers. Use this to stream +intermediate data from long-running tasks. + +`data` must be JSON-serializable. It is stored as a task log entry with +`level="result"`, distinguishing it from regular logs. + +```python +@queue.task() +def process_batch(items): + for i, item in enumerate(items): + result = process(item) + current_job.publish({"item_id": item.id, "status": "ok"}) + current_job.update_progress(int((i + 1) / len(items) * 100)) + return {"total": len(items)} +``` + +Consumer side: + +```python +job = process_batch.delay(items) +for partial in job.stream(timeout=120): + print(f"Processed: {partial}") +``` + +## How it works + +**Sync tasks (thread pool):** + +1. Before execution, the Rust worker calls `_set_context()` with the job's metadata +2. `current_job` reads from `threading.local` — each worker thread has independent storage +3. After the task completes (success or failure), `_clear_context()` resets the thread-local + +**Async tasks (native async pool):** + +1. Before execution, `set_async_context()` sets a `contextvars.ContextVar` token +2. `current_job` checks `contextvars` first; if a token is set it returns that context +3. After the coroutine finishes, `clear_async_context()` resets the token + +This means concurrent async tasks on the same event loop each see their own +isolated context — there is no cross-task interference. diff --git a/docs-next/content/docs/api-reference/result.mdx b/docs-next/content/docs/api-reference/result.mdx new file mode 100644 index 0000000..9e6dcb0 --- /dev/null +++ b/docs-next/content/docs/api-reference/result.mdx @@ -0,0 +1,224 @@ +--- +title: JobResult +description: "Handle for an enqueued job — status polling, result retrieval, dependencies." +--- + +Handle to an enqueued job. Provides methods to check status and retrieve +results, both synchronously and asynchronously. + +Returned by [`task.delay()`](/docs/api-reference/task), +[`task.apply_async()`](/docs/api-reference/task), +[`queue.enqueue()`](/docs/api-reference/queue), and canvas operations. + +## Properties + +### `job.id` + +```python +job.id -> str +``` + +The unique job ID (UUIDv7, time-ordered). + +### `job.status` + +```python +job.status -> str +``` + +Current job status. **Fetches fresh from the database** on every access. + +Returns one of: `"pending"`, `"running"`, `"complete"`, `"failed"`, `"dead"`, +`"cancelled"`. + +```python +job = add.delay(2, 3) +print(job.status) # "pending" +# ... after worker processes it ... +print(job.status) # "complete" +``` + +### `job.progress` + +```python +job.progress -> int | None +``` + +Current progress (0–100) if reported by the task via +[`current_job.update_progress()`](/docs/api-reference/context). Returns `None` +if no progress has been reported. Refreshes from the database. + +### `job.error` + +```python +job.error -> str | None +``` + +Error message if the job failed. `None` if the job hasn't failed. Refreshes +from the database. + +### `job.errors` + +```python +job.errors -> list[dict] +``` + +Full error history for this job — one entry per failed attempt. Each dict +contains: + +| Key | Type | Description | +|---|---|---| +| `id` | `str` | Error record ID | +| `job_id` | `str` | Parent job ID | +| `attempt` | `int` | Retry attempt number | +| `error` | `str` | Error message/traceback | +| `failed_at` | `str` | ISO timestamp of the failure | + +```python +job = flaky_task.delay() +# ... after retries ... +for err in job.errors: + print(f"Attempt {err['attempt']}: {err['error']}") +``` + +### `job.dependencies` + +```python +job.dependencies -> list[str] +``` + +List of job IDs this job depends on. Returns an empty list if the job has no +dependencies. See [Dependencies](/docs/guides/advanced-execution). + +### `job.dependents` + +```python +job.dependents -> list[str] +``` + +List of job IDs that depend on this job. Returns an empty list if no other +jobs depend on this one. + +## Methods + +### `job.to_dict()` + +```python +job.to_dict() -> dict +``` + +Return all job fields as a plain dictionary. Useful for JSON serialization +(e.g. in the [dashboard](/docs/guides/observability) or +[FastAPI integration](/docs/guides/integrations)). + +### `job.result()` + +```python +job.result( + timeout: float = 30.0, + poll_interval: float = 0.05, + max_poll_interval: float = 0.5, +) -> Any +``` + +**Block** until the job completes and return the deserialized result. Uses +exponential backoff for polling — starts at `poll_interval` and gradually +increases to `max_poll_interval`. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `timeout` | `float` | `30.0` | Maximum seconds to wait | +| `poll_interval` | `float` | `0.05` | Initial seconds between status checks | +| `max_poll_interval` | `float` | `0.5` | Maximum seconds between status checks | + +**Raises:** + +- `TimeoutError` — if the job doesn't complete within `timeout` +- `TaskFailedError` — if the job status is `"failed"` +- `MaxRetriesExceededError` — if the job status is `"dead"` (all retries exhausted) +- `TaskCancelledError` — if the job status is `"cancelled"` +- `SerializationError` — if result deserialization fails + +```python +from taskito import TaskFailedError, MaxRetriesExceededError, TaskCancelledError + +job = add.delay(2, 3) +result = job.result(timeout=10) # blocks, returns 5 + +# Handle specific failure modes +try: + result = job.result(timeout=10) +except TaskCancelledError: + print("Job was cancelled") +except MaxRetriesExceededError: + print("Job exhausted all retries") +except TaskFailedError: + print("Job failed") +``` + +### `await job.aresult()` + +```python +await job.aresult( + timeout: float = 30.0, + poll_interval: float = 0.05, + max_poll_interval: float = 0.5, +) -> Any +``` + +Async version of `result()`. Uses `asyncio.sleep()` instead of `time.sleep()`, +so it won't block the event loop. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `timeout` | `float` | `30.0` | Maximum seconds to wait | +| `poll_interval` | `float` | `0.05` | Initial seconds between status checks | +| `max_poll_interval` | `float` | `0.5` | Maximum seconds between status checks | + +**Raises:** same as `result()`. + +```python +job = add.delay(2, 3) +result = await job.aresult(timeout=10) +``` + +### `job.stream()` + +```python +job.stream( + timeout: float = 60.0, + poll_interval: float = 0.5, +) -> Iterator[Any] +``` + +Iterate over partial results published by the task via +[`current_job.publish()`](/docs/api-reference/context). Yields each result as +it arrives, stops when the job reaches a terminal state. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `timeout` | `float` | `60.0` | Maximum seconds to wait | +| `poll_interval` | `float` | `0.5` | Seconds between polls | + +```python +job = batch_process.delay(items) +for partial in job.stream(timeout=120): + print(f"Got: {partial}") +``` + +### `await job.astream()` + +```python +async for partial in job.astream( + timeout: float = 60.0, + poll_interval: float = 0.5, +) -> AsyncIterator[Any] +``` + +Async version of `stream()`. Uses `asyncio.sleep` so it won't block the event +loop. + +```python +async for partial in job.astream(timeout=120): + print(f"Got: {partial}") +``` diff --git a/docs-next/content/docs/api-reference/task.mdx b/docs-next/content/docs/api-reference/task.mdx new file mode 100644 index 0000000..2a74cc6 --- /dev/null +++ b/docs-next/content/docs/api-reference/task.mdx @@ -0,0 +1,140 @@ +--- +title: TaskWrapper +description: "Handle returned by @queue.task() — delay, apply_async, map, and signature builders." +--- + +Created by `@queue.task()` — not instantiated directly. Wraps a decorated +function to provide task submission methods. + +## Properties + +### `task.name` + +```python +task.name -> str +``` + +The registered task name. Either the explicit `name` passed to `@queue.task()` +or the function's qualified name. + +## Methods + +### `task.delay()` + +```python +task.delay(*args, **kwargs) -> JobResult +``` + +Enqueue the task for background execution using the decorator's default options. +Returns a [`JobResult`](/docs/api-reference/result) handle. + +```python +@queue.task(priority=5) +def add(a, b): + return a + b + +job = add.delay(2, 3) +print(job.result(timeout=10)) # 5 +``` + +### `task.apply_async()` + +```python +task.apply_async( + args: tuple = (), + kwargs: dict | None = None, + priority: int | None = None, + delay: float | None = None, + queue: str | None = None, + max_retries: int | None = None, + timeout: int | None = None, + unique_key: str | None = None, + metadata: str | None = None, + depends_on: str | list[str] | None = None, +) -> JobResult +``` + +Enqueue with full control over submission options. Any parameter not provided +falls back to the decorator's default. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `args` | `tuple` | `()` | Positional arguments for the task | +| `kwargs` | `dict \| None` | `None` | Keyword arguments for the task | +| `priority` | `int \| None` | `None` | Override priority (higher = more urgent) | +| `delay` | `float \| None` | `None` | Delay in seconds before the task is eligible | +| `queue` | `str \| None` | `None` | Override queue name | +| `max_retries` | `int \| None` | `None` | Override max retry count | +| `timeout` | `int \| None` | `None` | Override timeout in seconds | +| `unique_key` | `str \| None` | `None` | Deduplicate active jobs with same key | +| `metadata` | `str \| None` | `None` | Arbitrary JSON metadata to attach | +| `depends_on` | `str \| list[str] \| None` | `None` | Job ID(s) this job depends on. See [Dependencies](/docs/guides/advanced-execution). | + +```python +job = send_email.apply_async( + args=("user@example.com", "Hello"), + priority=10, + delay=3600, + queue="emails", + unique_key="welcome-user@example.com", + metadata='{"campaign": "onboarding"}', +) +``` + +### `task.map()` + +```python +task.map(iterable: list[tuple]) -> list[JobResult] +``` + +Enqueue one job per item in a single batch SQLite transaction. Uses the +decorator's default options. + +```python +jobs = add.map([(1, 2), (3, 4), (5, 6)]) +results = [j.result(timeout=10) for j in jobs] +print(results) # [3, 7, 11] +``` + +### `task.s()` + +```python +task.s(*args, **kwargs) -> Signature +``` + +Create a **mutable** [`Signature`](/docs/api-reference/canvas). In a +[`chain`](/docs/api-reference/canvas), the previous task's return value is +prepended to `args`. + +```python +sig = add.s(10) +# In a chain, if the previous step returned 5: +# add(5, 10) → 15 +``` + +### `task.si()` + +```python +task.si(*args, **kwargs) -> Signature +``` + +Create an **immutable** [`Signature`](/docs/api-reference/canvas). Ignores the +previous task's result — arguments are used as-is. + +```python +sig = add.si(2, 3) +# Always calls add(2, 3) regardless of previous result +``` + +### `task()` + +```python +task(*args, **kwargs) -> Any +``` + +Call the underlying function directly (synchronous, not queued). Useful for +testing or when you don't need background execution. + +```python +result = add(2, 3) # Direct call, returns 5 +``` diff --git a/docs-next/content/docs/api-reference/testing.mdx b/docs-next/content/docs/api-reference/testing.mdx new file mode 100644 index 0000000..8d6b7f2 --- /dev/null +++ b/docs-next/content/docs/api-reference/testing.mdx @@ -0,0 +1,166 @@ +--- +title: Testing +description: "Test mode, TestResult, and MockResource for unit testing tasks." +--- + +## `TestMode` + +```python +from taskito.testing import TestMode +# or use the shortcut: +# queue.test_mode() +``` + +Context manager that intercepts `enqueue()` to run tasks synchronously. No +worker, no Rust, no SQLite. + +### Constructor + +```python +TestMode(queue: Queue, propagate_errors: bool = False, resources: dict[str, Any] | None = None) +``` + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `queue` | `Queue` | *required* | The Queue instance to put into test mode | +| `propagate_errors` | `bool` | `False` | Re-raise task exceptions immediately instead of capturing them | +| `resources` | `dict[str, Any] \| None` | `None` | Resource name → mock instance map injected during test mode. `MockResource` values are unwrapped automatically. | + +### Usage + +```python +with TestMode(queue) as results: + my_task.delay(42) + assert results[0].return_value == expected + +# Shortcut via Queue: +with queue.test_mode() as results: + my_task.delay(42) + +# With mock resources: +with queue.test_mode(resources={"db": mock_session}) as results: + create_user.delay("Alice") +``` + +--- + +## `TestResult` + +```python +from taskito import TestResult +``` + +Dataclass capturing the result of a single task execution in test mode. + +### Attributes + +| Attribute | Type | Description | +|---|---|---| +| `job_id` | `str` | Synthetic test ID (e.g. `"test-000001"`) | +| `task_name` | `str` | Fully qualified name of the task | +| `args` | `tuple` | Positional arguments the task was called with | +| `kwargs` | `dict` | Keyword arguments the task was called with | +| `return_value` | `Any` | Return value of the task (or `None` if it failed) | +| `error` | `Exception \| None` | Exception instance if the task raised | +| `traceback` | `str \| None` | Formatted traceback string if the task raised | + +### Properties + +| Property | Type | Description | +|---|---|---| +| `succeeded` | `bool` | `True` if `error is None` | +| `failed` | `bool` | `True` if `error is not None` | + +--- + +## `TestResults` + +```python +from taskito import TestResults +``` + +A `list[TestResult]` subclass with convenience filtering methods. + +### Properties + +| Property | Returns | Description | +|---|---|---| +| `succeeded` | `TestResults` | All results where `succeeded is True` | +| `failed` | `TestResults` | All results where `failed is True` | + +### Methods + +#### `.filter()` + +```python +results.filter(task_name: str | None = None, succeeded: bool | None = None) -> TestResults +``` + +Filter results by task name and/or outcome. + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `task_name` | `str \| None` | `None` | Exact match on task name | +| `succeeded` | `bool \| None` | `None` | `True` = successes only, `False` = failures only | + +Returns a new `TestResults` containing only matching items. + +```python +results.filter(task_name="myapp.send_email") +results.filter(succeeded=False) +results.filter(task_name="myapp.process", succeeded=True) +``` + +--- + +## `MockResource` + +```python +from taskito import MockResource +``` + +Test double for a worker resource with optional call tracking. Pass instances +to `queue.test_mode(resources=...)`. + +### Constructor + +```python +MockResource( + name: str, + return_value: Any = None, + wraps: Any = None, + track_calls: bool = False, +) +``` + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `name` | `str` | *required* | Resource name (informational). | +| `return_value` | `Any` | `None` | Value returned when the resource is accessed via `.get()`. | +| `wraps` | `Any` | `None` | A real object to wrap — returned as-is from `.get()`. | +| `track_calls` | `bool` | `False` | Increment `call_count` each time `.get()` is called. | + +### Attributes + +| Attribute | Type | Description | +|---|---|---| +| `call_count` | `int` | Number of times the resource was accessed. | +| `calls` | `list` | Reserved for future per-call argument tracking. | + +### Usage + +```python +from taskito import MockResource + +# Simple mock value +mock_db = MockResource("db", return_value=FakeSessionFactory()) + +# Wrap a real object with call tracking +spy = MockResource("db", wraps=real_session_factory, track_calls=True) + +with queue.test_mode(resources={"db": spy}) as results: + process_order.delay(42) + +assert spy.call_count == 1 +assert results[0].succeeded +``` diff --git a/docs-next/content/docs/api-reference/workflows.mdx b/docs-next/content/docs/api-reference/workflows.mdx new file mode 100644 index 0000000..52cedad --- /dev/null +++ b/docs-next/content/docs/api-reference/workflows.mdx @@ -0,0 +1,301 @@ +--- +title: Workflows +description: "DAG workflow builder, run handles, gate APIs, and supporting types." +--- + +DAG workflow builder, execution handles, and analysis tools. + +## `Workflow` + +Builder for a workflow DAG. + +### Constructor + +```python +Workflow( + name: str = "workflow", + version: int = 1, + on_failure: str = "fail_fast", + cache_ttl: float | None = None, +) +``` + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `name` | `str` | `"workflow"` | Workflow name (used for definition storage) | +| `version` | `int` | `1` | Version number | +| `on_failure` | `str` | `"fail_fast"` | Error strategy: `"fail_fast"` or `"continue"` | +| `cache_ttl` | `float \| None` | `None` | Cache TTL in seconds for incremental runs | + +### `step()` + +```python +wf.step( + name: str, + task: TaskWrapper, + *, + after: str | list[str] | None = None, + args: tuple = (), + kwargs: dict | None = None, + queue: str | None = None, + max_retries: int | None = None, + timeout_ms: int | None = None, + priority: int | None = None, + fan_out: str | None = None, + fan_in: str | None = None, + condition: str | Callable | None = None, +) -> Workflow +``` + +Add a task step. Returns `self` for chaining. + +### `gate()` + +```python +wf.gate( + name: str, + *, + after: str | list[str] | None = None, + condition: str | Callable | None = None, + timeout: float | None = None, + on_timeout: str = "reject", + message: str | Callable | None = None, +) -> Workflow +``` + +Add an approval gate step. + +### `visualize()` + +```python +wf.visualize(fmt: str = "mermaid") -> str +``` + +Render the DAG as a Mermaid or DOT diagram string. + +### `ancestors()` / `descendants()` + +```python +wf.ancestors(node: str) -> list[str] +wf.descendants(node: str) -> list[str] +``` + +### `topological_levels()` + +```python +wf.topological_levels() -> list[list[str]] +``` + +### `stats()` + +```python +wf.stats() -> dict[str, int | float] +``` + +Returns `{nodes, edges, depth, width, density}`. + +### `critical_path()` + +```python +wf.critical_path(costs: dict[str, float]) -> tuple[list[str], float] +``` + +Returns `(path, total_cost)` — the longest-weighted path. + +### `execution_plan()` + +```python +wf.execution_plan(max_workers: int = 1) -> list[list[str]] +``` + +### `bottleneck_analysis()` + +```python +wf.bottleneck_analysis(costs: dict[str, float]) -> dict[str, Any] +``` + +Returns `{node, cost, percentage, critical_path, total_cost, suggestion}`. + +--- + +## `WorkflowRun` + +Handle for a submitted workflow run. + +### `status()` + +```python +run.status() -> WorkflowStatus +``` + +### `wait()` + +```python +run.wait(timeout: float | None = None, poll_interval: float = 0.1) -> WorkflowStatus +``` + +Block until the workflow reaches a terminal state. Raises `WorkflowTimeoutError` +on timeout. + +### `cancel()` + +```python +run.cancel() -> None +``` + +### `node_status()` + +```python +run.node_status(node_name: str) -> NodeStatus +``` + +### `visualize()` + +```python +run.visualize(fmt: str = "mermaid") -> str +``` + +Render the DAG with live node status colors. + +--- + +## `WorkflowProxy` + +Returned by `@queue.workflow()`. Wraps a factory function. + +### `submit()` + +```python +proxy.submit(*args, **kwargs) -> WorkflowRun +``` + +Build and submit the workflow. + +### `build()` + +```python +proxy.build(*args, **kwargs) -> Workflow +``` + +Materialize without submitting. + +### `as_step()` + +```python +proxy.as_step(**params) -> SubWorkflowRef +``` + +Return a reference for use as a sub-workflow step. + +--- + +## Queue methods + +Added to `Queue` via `QueueWorkflowMixin`: + +### `submit_workflow()` + +```python +queue.submit_workflow( + workflow: Workflow, + *, + incremental: bool = False, + base_run: str | None = None, +) -> WorkflowRun +``` + +### `approve_gate()` + +```python +queue.approve_gate(run_id: str, node_name: str) -> None +``` + +### `reject_gate()` + +```python +queue.reject_gate(run_id: str, node_name: str, error: str = "rejected") -> None +``` + +### `@queue.workflow()` + +```python +@queue.workflow(name: str | None = None, *, version: int = 1) +def factory() -> Workflow: ... +``` + +--- + +## Types + +### `WorkflowState` + +```python +class WorkflowState(str, Enum): + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + PAUSED = "paused" +``` + +### `NodeStatus` + +```python +class NodeStatus(str, Enum): + PENDING = "pending" + READY = "ready" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + SKIPPED = "skipped" + WAITING_APPROVAL = "waiting_approval" + CACHE_HIT = "cache_hit" +``` + +### `WorkflowStatus` + +```python +@dataclass +class WorkflowStatus: + run_id: str + state: WorkflowState + started_at: int | None + completed_at: int | None + error: str | None + nodes: dict[str, NodeSnapshot] +``` + +### `NodeSnapshot` + +```python +@dataclass +class NodeSnapshot: + name: str + status: NodeStatus + job_id: str | None + error: str | None +``` + +### `WorkflowContext` + +```python +@dataclass(frozen=True) +class WorkflowContext: + run_id: str + results: dict[str, Any] + statuses: dict[str, str] + params: dict[str, Any] | None + failure_count: int + success_count: int +``` + +### `GateConfig` + +```python +@dataclass +class GateConfig: + timeout: float | None = None + on_timeout: str = "reject" + message: str | Callable | None = None +```