diff --git a/Makefile b/Makefile deleted file mode 100644 index c74ddd12..00000000 --- a/Makefile +++ /dev/null @@ -1,9 +0,0 @@ -.PHONY: docs docs-serve - -docs: - uv run zensical build - @echo "Copying markdown sources..." - cd docs && find . -name '*.md' -exec install -D {} ../site/_sources/{} \; - -docs-serve: - uv run zensical serve diff --git a/dashboard/src/routes/webhooks.$id.deliveries.tsx b/dashboard/src/routes/webhooks_.$id.deliveries.tsx similarity index 97% rename from dashboard/src/routes/webhooks.$id.deliveries.tsx rename to dashboard/src/routes/webhooks_.$id.deliveries.tsx index cf1052d4..37d52c59 100644 --- a/dashboard/src/routes/webhooks.$id.deliveries.tsx +++ b/dashboard/src/routes/webhooks_.$id.deliveries.tsx @@ -15,7 +15,7 @@ import { import type { DeliveryStatus } from "@/features/webhooks"; import { DeliveryListTable, useDeliveries, useWebhooks } from "@/features/webhooks"; -export const Route = createFileRoute("/webhooks/$id/deliveries")({ +export const Route = createFileRoute("/webhooks_/$id/deliveries")({ component: DeliveriesPage, }); diff --git a/docs/content/docs/guides/extensibility/events-webhooks.mdx b/docs/content/docs/guides/extensibility/events-webhooks.mdx index 03d13d74..d2e4775b 100644 --- a/docs/content/docs/guides/extensibility/events-webhooks.mdx +++ b/docs/content/docs/guides/extensibility/events-webhooks.mdx @@ -1,17 +1,28 @@ --- title: Events & Webhooks -description: "In-process event bus and HMAC-signed HTTP webhooks for job and worker lifecycle events." +description: "In-process event bus, dashboard-managed HMAC-signed webhooks, persistent delivery log, and replay." --- -taskito includes an in-process event bus and webhook delivery system for -reacting to job lifecycle events. +import { Callout } from "fumadocs-ui/components/callout"; +import { Tab, Tabs } from "fumadocs-ui/components/tabs"; + +taskito has two complementary primitives for reacting to job lifecycle: + +1. **In-process event bus** — `queue.on_event()` registers Python callbacks + dispatched in a thread pool. Same process, lowest latency, no + serialization, no HTTP. +2. **Webhooks** — HMAC-signed HTTP POSTs to external endpoints, managed + from the dashboard (or the Python API), with a persistent delivery + log and one-click replay. + +This guide covers both, starting with the events that drive them. ## Event types The `EventType` enum defines all available lifecycle events: -| Event | Fired when | Payload fields | -|-------|------------|----------------| +| Event | Fires when | Payload fields | +|---|---|---| | `JOB_ENQUEUED` | A job is added to the queue | `job_id`, `task_name`, `queue` | | `JOB_COMPLETED` | A job finishes successfully | `job_id`, `task_name`, `queue` | | `JOB_FAILED` | A job raises an exception (before retry) | `job_id`, `task_name`, `queue`, `error` | @@ -26,18 +37,11 @@ The `EventType` enum defines all available lifecycle events: | `QUEUE_PAUSED` | A named queue is paused | `queue` | | `QUEUE_RESUMED` | A paused queue is resumed | `queue` | -`JOB_RETRYING`, `JOB_DEAD`, and `JOB_CANCELLED` are emitted by the Rust -result handler immediately after the scheduler records the outcome. -Middleware hooks (`on_retry`, `on_dead_letter`, `on_cancel`) are called in -the same result-handling pass, after the event fires. +## In-process listeners -`QUEUE_PAUSED` and `QUEUE_RESUMED` are emitted synchronously by -`queue.pause()` and `queue.resume()` after the queue state is written to -storage. - -## Registering listeners - -Use `queue.on_event()` to subscribe a callback to a specific event type: +Use `queue.on_event()` to subscribe a callback. Callbacks run in a +`ThreadPoolExecutor` so they never block the worker, and exceptions are +logged but don't affect job processing. ```python from taskito import Queue @@ -45,66 +49,97 @@ from taskito.events import EventType queue = Queue(db_path="tasks.db") -def on_failure(event_type: EventType, payload: dict): +def on_failure(event_type: EventType, payload: dict) -> None: print(f"Job {payload['job_id']} failed: {payload.get('error')}") queue.on_event(EventType.JOB_FAILED, on_failure) ``` -### Callback signature +Configure the pool size via `Queue(event_workers=N)` (default 4) if +your callbacks are slow. + +## Webhooks (dashboard-managed) -All callbacks receive two arguments: +Webhook subscriptions are first-class **persisted** resources — survive +restarts, propagate across every worker pointed at the same backend, +and are fully manageable from the dashboard. The same surface is +available programmatically via `queue.add_webhook()` / +`list_webhooks()` / `update_webhook()` / etc. -- `event_type` (`EventType`) — the event that occurred -- `payload` (`dict`) — event details including `job_id`, `task_name`, `queue`, and event-specific fields +### Configure from the dashboard -### Async delivery +The Webhooks page (sidebar → Configuration → Webhooks) lists every +subscription with its URL, event filter, optional task filter, retry +policy, and status. -Callbacks are dispatched asynchronously in a `ThreadPoolExecutor`. The -thread pool size defaults to 4 and can be configured via -`Queue(event_workers=N)`. This means: +![Webhooks page with three subscriptions](/screenshots/dashboard/webhooks-list.png) -- Callbacks never block the worker -- Exceptions in callbacks are logged but do not affect job processing -- Callbacks may execute slightly after the event occurs +Click **+ New webhook** to add an endpoint. The dialog walks you +through URL, optional description, the event-type multi-select, an +optional per-task filter, and a checkbox to auto-generate an +HMAC-SHA256 signing secret. -## Webhooks +![New webhook dialog](/screenshots/dashboard/webhook-create-dialog.png) -For external systems, register webhook URLs to receive HTTP POST requests -on job events. +After save, the new secret is shown **once** in a copy-and-reveal card +— treat it like an API key. The same flow applies when you rotate the +secret later from the row-actions menu. -### Registering a webhook +### Per-row actions + +Each row has a "⋯" menu: + +| Action | Effect | +|---|---| +| **View deliveries** | Open the persistent delivery log (see below) | +| **Send test** | POST a synthetic `test.ping` event synchronously and toast the result | +| **Enable / Disable** | Flip the active flag without losing the configuration | +| **Rotate secret** | Generate a new HMAC secret. Confirm dialog prevents accidents | +| **Delete** | Type-to-confirm destructive dialog removes the subscription | + +### Configure from Python + +The same operations are available programmatically: ```python -queue.add_webhook( - url="https://example.com/hooks/taskito", +from taskito import Queue +from taskito.events import EventType + +queue = Queue(db_path="tasks.db") + +sub = queue.add_webhook( + url="https://hooks.example.com/ops-failures", events=[EventType.JOB_FAILED, EventType.JOB_DEAD], - headers={"Authorization": "Bearer mytoken"}, - secret="my-signing-secret", + secret="whsec_my_signing_secret", + description="Page ops on permanent failures", + max_retries=5, + timeout=8.0, + task_filter=["myapp.tasks.send_email"], # optional per-task gate ) +print(sub.id) # use this to update / remove later + +queue.update_webhook(sub.id, enabled=False) +queue.rotate_webhook_secret(sub.id) +queue.remove_webhook(sub.id) ``` | Parameter | Type | Default | Description | -|-----------|------|---------|-------------| -| `url` | `str` | — | URL to POST event payloads to (must be `http://` or `https://`) | +|---|---|---|---| +| `url` | `str` | — | http/https URL. SSRF-guarded — see below | | `events` | `list[EventType] \| None` | `None` | Event types to subscribe to. `None` means all events | -| `headers` | `dict[str, str] \| None` | `None` | Extra HTTP headers to include in requests | -| `secret` | `str \| None` | `None` | HMAC-SHA256 signing secret | +| `task_filter` | `list[str] \| None` | `None` | Restrict to specific task names. `None` means all tasks | +| `headers` | `dict[str, str] \| None` | `None` | Extra HTTP headers (e.g. `Authorization`) | +| `secret` | `str \| None` | `None` | HMAC-SHA256 signing key | | `max_retries` | `int` | `3` | Maximum delivery attempts | | `timeout` | `float` | `10.0` | HTTP request timeout in seconds | | `retry_backoff` | `float` | `2.0` | Base for exponential backoff between retries | +| `description` | `str \| None` | `None` | Free-form label shown in the dashboard | ### HMAC signing -When a `secret` is provided, each webhook request includes an -`X-Taskito-Signature` header: - -``` -X-Taskito-Signature: sha256= -``` - -The signature is computed over the JSON request body using HMAC-SHA256. -Verify it on the receiving end: +When a `secret` is set, every webhook request includes +`X-Taskito-Signature: sha256=`. Verify it on the receiving +end: ```python import hashlib @@ -115,46 +150,93 @@ def verify_signature(body: bytes, signature: str, secret: str) -> bool: return hmac.compare_digest(f"sha256={expected}", signature) ``` -### Retry behavior +The signature is computed over the raw JSON request body. **Verify +before parsing the body** — that way a forged payload never reaches +your business logic. + + + The secret column stores the value as-is in the dashboard settings + table. The DB is already trusted with everything else taskito + persists (job payloads, error tracebacks). If you need at-rest + encryption beyond filesystem-level (e.g. SQLite encrypted with + SQLCipher), the dashboard never returns the raw secret after the + initial create / rotate response — only a ``has_secret`` indicator. + -Failed webhook deliveries are retried with exponential backoff. The number -of attempts, request timeout, and backoff base are configurable per webhook -via `max_retries`, `timeout`, and `retry_backoff`. With the defaults -(`max_retries=3`, `retry_backoff=2.0`): +### SSRF guard + +Outbound webhook URLs are validated before the manager queues any +delivery. The guard rejects: + +- Non-`http` / `https` schemes +- `localhost`, `*.local`, `*.internal`, `*.intranet`, `*.lan`, + `*.private` +- Any address that resolves to a loopback, link-local, RFC1918, + multicast, or unspecified range (including the AWS metadata service + at `169.254.169.254`) + +Set `TASKITO_WEBHOOKS_ALLOW_PRIVATE=1` to lift the guard for local +development. Keep it on in production. + +### Retry behaviour + +Failed webhook deliveries are retried with exponential backoff, +configurable per subscription via `max_retries`, `timeout`, and +`retry_backoff`. With the defaults (`max_retries=3`, +`retry_backoff=2.0`): | Attempt | Delay before next retry | -|---------|------------------------| +|---|---| | 1st retry | 1 second (`2.0 ** 0`) | | 2nd retry | 2 seconds (`2.0 ** 1`) | | 3rd retry | — (final) | -4xx responses are not retried. If all attempts fail, a warning is logged -and the event is dropped. +4xx responses are NOT retried — they're treated as client errors and +the delivery is marked `failed`. 5xx responses retry until exhausted, +at which point the delivery is marked `dead`. -### Event filtering +## Delivery log + replay -Subscribe to specific events or all events: +Every webhook attempt — successful, failed, or dead-lettered — is +recorded under the subscription so you can debug failures without +leaving the dashboard. -```python -# Only failure events -queue.add_webhook( - url="https://slack.example.com/webhook", - events=[EventType.JOB_FAILED, EventType.JOB_DEAD], -) +![Delivery log with mixed outcomes](/screenshots/dashboard/webhook-deliveries.png) -# All events -queue.add_webhook(url="https://monitoring.example.com/events") -``` +Each row carries: + +- **When** — relative timestamp ("2 minutes ago") +- **Event** — the event type the delivery was for (`job.completed`, `job.failed`, etc.) +- **Status** — `delivered` (green), `failed` (yellow), `dead` (red) +- **Code** — final HTTP status returned by the endpoint +- **Latency** — wall time of the final attempt +- **Attempts** — total retry count consumed + +Click any row to inspect the full payload, the truncated response body +(first 2 KiB), and any transport-level error. The **Replay** button +re-fires the stored payload synchronously and records the outcome as a +fresh delivery — the original record is preserved for the audit trail. + +### Retention + +Each subscription keeps the most recent 200 deliveries in a FIFO ring +buffer per webhook (configurable via the +`DeliveryStore(max_per_webhook=N)` constructor). Successful and +failed deliveries are stored uniformly so the replay view always +matches what really happened. ## Examples -### Slack notification on job failure +### Slack-on-failure (in-process listener) + +When the receiver is the same Python process and you don't need +persistence, the event bus is the cheapest path: ```python import requests from taskito.events import EventType -def notify_slack(event_type: EventType, payload: dict): +def notify_slack(event_type: EventType, payload: dict) -> None: requests.post( "https://hooks.slack.com/services/T.../B.../xxx", json={ @@ -168,7 +250,11 @@ queue.on_event(EventType.JOB_FAILED, notify_slack) queue.on_event(EventType.JOB_DEAD, notify_slack) ``` -### Webhook to external monitoring +### Persistent webhook to an external service + +When the receiver is a separate service — auditing, monitoring, a +different team's pipeline — use a webhook so the delivery log is +preserved across restarts: ```python queue.add_webhook( @@ -176,58 +262,73 @@ queue.add_webhook( events=[EventType.JOB_COMPLETED, EventType.JOB_FAILED, EventType.JOB_DEAD], secret="whsec_abc123", headers={"X-Source": "taskito-prod"}, + description="Forward terminal job outcomes to the monitoring service", ) ``` -The monitoring service receives JSON payloads like: +Payload shape received by the endpoint: ```json { - "event": "job.failed", - "job_id": "01H5K6X...", - "task_name": "myapp.tasks.process", - "queue": "default", - "error": "ConnectionError: ..." + "event": "job.failed", + "job_id": "01H5K6X...", + "task_name": "myapp.tasks.process", + "queue": "default", + "error": "ConnectionError: ..." } ``` -### Job completion tracking +### Flask receiver + +A minimal Flask app that receives and verifies taskito webhooks: ```python -from taskito.events import EventType +from flask import Flask, request, abort +import hashlib, hmac -completed_count = 0 +app = Flask(__name__) +WEBHOOK_SECRET = "whsec_my_signing_secret" -def track_completion(event_type: EventType, payload: dict): - global completed_count - completed_count += 1 - if completed_count % 100 == 0: - print(f"Milestone: {completed_count} jobs completed") +@app.route("/hooks/taskito", methods=["POST"]) +def receive_webhook(): + signature = request.headers.get("X-Taskito-Signature", "") + expected = hmac.new( + WEBHOOK_SECRET.encode(), request.data, hashlib.sha256 + ).hexdigest() -queue.on_event(EventType.JOB_COMPLETED, track_completion) + if not hmac.compare_digest(f"sha256={expected}", signature): + abort(401) + + event = request.json + print(f"Received event: {event['event']} for job {event['job_id']}") + return "", 204 ``` -### Database logging for audit trail +### Database audit trail (in-process listener) ```python from taskito.events import EventType -def audit_log(event_type: EventType, payload: dict): +def audit_log(event_type: EventType, payload: dict) -> None: db.execute( "INSERT INTO audit_log (event, job_id, task_name, timestamp) VALUES (?, ?, ?, ?)", (event_type.value, payload["job_id"], payload["task_name"], time.time()), ) -# Subscribe to all important events -for event in [EventType.JOB_ENQUEUED, EventType.JOB_COMPLETED, EventType.JOB_FAILED, EventType.JOB_DEAD]: +for event in [ + EventType.JOB_ENQUEUED, + EventType.JOB_COMPLETED, + EventType.JOB_FAILED, + EventType.JOB_DEAD, +]: queue.on_event(event, audit_log) ``` ## Event ordering -Events fire in the order the scheduler processes results — typically the -order jobs complete. For jobs that complete nearly simultaneously, ordering -is **not guaranteed** across different workers or threads. +Events fire in the order the scheduler processes results — typically +the order jobs complete. For jobs that complete nearly simultaneously, +ordering is **not guaranteed** across different workers or threads. Within a single job's lifecycle, events always fire in this order: @@ -236,50 +337,7 @@ Within a single job's lifecycle, events always fire in this order: 3. `JOB_RETRYING` (if retried, before the next attempt) 4. `JOB_DEAD` (if all retries exhausted) -## Backpressure - -Events are dispatched to a thread pool (default size: 4, configurable via -`event_workers=N`). If callbacks are slow and events arrive faster than -they can be processed, they queue in memory. - -For high-volume event scenarios: - -```python -queue = Queue(event_workers=16) # More threads for slow callbacks -``` +## Reference -If a callback raises an exception, it is logged and the event is dropped — -it does not retry or block other callbacks. - -## Webhook failure - -Webhooks retry with exponential backoff (up to `max_retries`). After all -retries are exhausted, the webhook delivery is **logged and dropped** — -there is no dead-letter queue for webhooks. Monitor webhook failures via -the `on_failure` callback or structured logging. - -### Webhook receiver (Flask) - -A minimal Flask app that receives and verifies taskito webhooks: - -```python -from flask import Flask, request, abort -import hashlib, hmac - -app = Flask(__name__) -WEBHOOK_SECRET = "my-signing-secret" - -@app.route("/hooks/taskito", methods=["POST"]) -def receive_webhook(): - signature = request.headers.get("X-Taskito-Signature", "") - expected = hmac.new( - WEBHOOK_SECRET.encode(), request.data, hashlib.sha256 - ).hexdigest() - - if not hmac.compare_digest(f"sha256={expected}", signature): - abort(401) - - event = request.json - print(f"Received event: {event['event']} for job {event['job_id']}") - return "", 204 -``` +- [Dashboard REST API for webhooks and deliveries](/guides/observability/dashboard-api#webhooks) +- [Dashboard auth — how to call these endpoints from a script](/guides/observability/dashboard-auth) diff --git a/docs/content/docs/guides/observability/dashboard-api.mdx b/docs/content/docs/guides/observability/dashboard-api.mdx index 2e3028ea..1fd63359 100644 --- a/docs/content/docs/guides/observability/dashboard-api.mdx +++ b/docs/content/docs/guides/observability/dashboard-api.mdx @@ -1,10 +1,55 @@ --- title: Dashboard REST API -description: "JSON endpoints for stats, jobs, dead letters, metrics, logs, infrastructure, observability." +description: "JSON endpoints for stats, jobs, dead letters, metrics, logs, infrastructure, observability, webhooks, and runtime overrides." --- -The dashboard exposes a JSON API you can use independently of the UI. All -endpoints return `application/json` with `Access-Control-Allow-Origin: *`. +import { Callout } from "fumadocs-ui/components/callout"; + +The dashboard exposes a JSON API you can use independently of the UI. +All endpoints return `application/json` and live under the same origin +as the dashboard itself. + + + Every `/api/*` endpoint except the public set (`/api/auth/status`, + `/api/auth/login`, `/api/auth/setup`) requires a valid session cookie + obtained from `POST /api/auth/login`. State-changing requests + (POST/PUT/DELETE) additionally require a CSRF header. See + [Dashboard Authentication](/guides/observability/dashboard-auth) for + the login flow and headless usage examples. + + +## Auth + +### `GET /api/auth/status` + +Public. Returns whether the dashboard needs first-run setup. + +```json +{ "setup_required": false } +``` + +### `POST /api/auth/setup` + +Public, but locks itself after the first user is created. Body: +`{"username": "...", "password": "..."}`. Returns the new user. + +### `POST /api/auth/login` + +Body: `{"username": "...", "password": "..."}`. Sets the +`taskito_session` (HttpOnly) and `taskito_csrf` cookies on success. +Returns `400 invalid_credentials` on failure. + +### `POST /api/auth/logout` + +Invalidates the current session and clears cookies. + +### `GET /api/auth/whoami` + +Returns the current user, CSRF token, and expiry. `401` when no session. + +### `POST /api/auth/change-password` + +Body: `{"old_password": "...", "new_password": "..."}`. ## Stats @@ -25,13 +70,8 @@ Queue statistics snapshot. ### `GET /api/stats/queues` -Per-queue statistics. Pass `?queue=name` for a single queue, or omit for all -queues. - -```bash -curl http://localhost:8080/api/stats/queues -curl http://localhost:8080/api/stats/queues?queue=emails -``` +Per-queue statistics. Pass `?queue=name` for a single queue, or omit +for all queues. ## Jobs @@ -51,10 +91,6 @@ Paginated list of jobs with filtering. | `limit` | `int` | `20` | Page size | | `offset` | `int` | `0` | Pagination offset | -```bash -curl http://localhost:8080/api/jobs?status=running&limit=10 -``` - ### `GET /api/jobs/{id}` Full detail for a single job. @@ -79,41 +115,263 @@ Dependency graph for a job (nodes and edges). Cancel a pending job. -```json -{ "cancelled": true } -``` - ### `POST /api/jobs/{id}/replay` Replay a completed or failed job with the same payload. -```json -{ "replay_job_id": "01H5K7Y..." } -``` - ## Dead letters ### `GET /api/dead-letters` -Paginated list of dead letter entries. Supports `limit` and `offset` -parameters. +Paginated list of dead letter entries. Supports `limit` and `offset`. ### `POST /api/dead-letters/{id}/retry` Re-enqueue a dead letter job. +### `POST /api/dead-letters/purge` + +Purge all dead letters. + +## Webhooks + +Full guide: [Events & Webhooks](/guides/extensibility/events-webhooks). + +### `GET /api/webhooks` + +List all subscriptions. The `secret` field is **never** returned — only +a `has_secret` boolean. The secret is only included on the response to +`POST /api/webhooks` (create) and `POST /api/webhooks/{id}/rotate-secret`, +exactly once. + ```json -{ "new_job_id": "01H5K7Y..." } +[ + { + "id": "f00563cbbb1a4200bb461f83d1db47bf", + "url": "https://hooks.example.com/ops-failures", + "events": ["job.failed", "job.dead"], + "task_filter": null, + "headers": {}, + "has_secret": true, + "max_retries": 5, + "timeout_seconds": 8.0, + "retry_backoff": 2.0, + "enabled": true, + "description": "Page ops on permanent failures", + "created_at": 1716000000, + "updated_at": 1716000000 + } +] ``` -### `POST /api/dead-letters/purge` +### `POST /api/webhooks` -Purge all dead letters. +Create a subscription. + +| Field | Type | Description | +|---|---|---| +| `url` | `string` | Required. http/https URL, SSRF-vetted | +| `events` | `string[]` | Event types (`job.failed`, etc.). Empty/missing → all | +| `task_filter` | `string[] \| null` | Restrict to specific task names. `null` → all tasks | +| `headers` | `object` | Extra HTTP headers | +| `secret` | `string \| null` | Explicit signing key | +| `generate_secret` | `bool` | If true, server generates a fresh secret | +| `max_retries` | `int` | Default 3 | +| `timeout_seconds` | `float` | Default 10.0 | +| `retry_backoff` | `float` | Default 2.0 | +| `description` | `string \| null` | Free-form label | + +Response includes the secret **once** if one was set or generated. + +### `GET /api/webhooks/{id}` + +Single subscription (secret redacted). + +### `PUT /api/webhooks/{id}` + +Partial update. Only fields you include are touched. Same field set as +create. + +### `DELETE /api/webhooks/{id}` + +Delete the subscription. + +### `POST /api/webhooks/{id}/test` + +Synchronously POST a synthetic `test.ping` event and return the +result inline. ```json -{ "purged": 42 } +{ "status": 200, "delivered": true } ``` +### `POST /api/webhooks/{id}/rotate-secret` + +Generate a fresh HMAC secret. Returns `{id, secret}` — the only time +the new value is visible. + +### `GET /api/event-types` + +Sorted list of every valid event type value. Used by the dashboard's +event multi-select. + +```json +["job.cancelled", "job.completed", "job.dead", ...] +``` + +## Webhook deliveries + +### `GET /api/webhooks/{id}/deliveries` + +Persistent log of attempts for the subscription. Supports filters: + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `status` | `delivered \| failed \| dead \| pending` | all | Filter by outcome | +| `event` | `string` | all | Filter by event type | +| `limit` | `int` | `50` | Page size (max 200) | +| `offset` | `int` | `0` | Pagination offset | + +```json +{ + "items": [ + { + "id": "01H...", + "subscription_id": "f00563cb...", + "event": "job.failed", + "payload": { "job_id": "...", "task_name": "...", "error": "..." }, + "task_name": "myapp.tasks.process_image", + "job_id": "01H...", + "status": "dead", + "attempts": 3, + "response_code": 500, + "response_body": "Internal Server Error", + "latency_ms": 30000, + "error": null, + "created_at": 1716000000000, + "completed_at": 1716000030000 + } + ], + "total": 1, + "limit": 50, + "offset": 0 +} +``` + +### `GET /api/webhooks/{id}/deliveries/{delivery_id}` + +Single delivery record. + +### `POST /api/webhooks/{id}/deliveries/{delivery_id}/replay` + +Re-fire the stored payload synchronously. Records the outcome as a +fresh delivery on top of the original (audit trail preserved). + +```json +{ "replayed_of": "01H...", "status": 200, "delivered": true } +``` + +## Tasks and overrides + +Full guide: [Task & Queue Overrides](/guides/observability/task-overrides). + +### `GET /api/tasks` + +List every registered task with decorator defaults, override, and +effective values. + +```json +[ + { + "name": "myapp.tasks.send_email", + "queue": "default", + "defaults": { + "max_retries": 3, + "retry_backoff": 1.0, + "timeout": 300, + "priority": 0, + "rate_limit": null, + "max_concurrent": null + }, + "override": { "rate_limit": "200/m", "max_retries": 10 }, + "effective": { + "max_retries": 10, + "retry_backoff": 1.0, + "timeout": 300, + "priority": 0, + "rate_limit": "200/m", + "max_concurrent": null + }, + "paused": false + } +] +``` + +### `GET /api/tasks/{name}/override` + +Single task's override row. `404` if none set. + +### `PUT /api/tasks/{name}/override` + +Upsert the override. Body keys must be in the allow-list: +`rate_limit`, `max_concurrent`, `max_retries`, `retry_backoff`, +`timeout`, `priority`, `paused`. Passing `null` for a field removes +just that field. Unknown fields → `400`. + +### `DELETE /api/tasks/{name}/override` + +Remove the override entirely. Returns `{cleared: bool}`. + +### `GET /api/queues` + +List every queue mentioned by a task config with defaults, override, +effective, and paused state. + +### `GET /api/queues/{name}/override` / `PUT` / `DELETE` + +Same shape as tasks. Allow-list for queue overrides: +`rate_limit`, `max_concurrent`, `paused`. The `paused` flag also flips +the live `paused_queues` table so it takes effect on running workers +immediately. + +## Middleware + +### `GET /api/middleware` + +List every registered middleware (global + per-task) with its scopes. + +```json +[ + { "name": "sentry", "class_path": "myapp.middleware.SentryMiddleware", "scopes": [{"kind": "global"}] } +] +``` + +### `GET /api/tasks/{name}/middleware` + +The middleware chain that fires for a task, with each entry's +`disabled` and `effective` flags. + +```json +{ + "task": "myapp.tasks.send_email", + "middleware": [ + { "name": "demo.logging", "class_path": "...", "disabled": false, "effective": true }, + { "name": "demo.metrics", "class_path": "...", "disabled": true, "effective": false } + ] +} +``` + +### `PUT /api/tasks/{name}/middleware/{mw_name}` + +Body: `{"enabled": bool}`. Returns `{task, disabled: [...]}` reflecting +the new disable list. `404` if the middleware name isn't registered on +the task — typos can't write no-op disables. + +### `DELETE /api/tasks/{name}/middleware` + +Clear all middleware disables for a task — every middleware fires +again. + ## Metrics ### `GET /api/metrics` @@ -129,12 +387,6 @@ Per-task execution metrics. Time-bucketed metrics for charts. -| Parameter | Type | Default | Description | -|---|---|---|---| -| `task` | `string` | all | Filter by task name | -| `since` | `int` | `3600` | Lookback window in seconds | -| `bucket` | `int` | `60` | Bucket size in seconds | - ## Logs ### `GET /api/logs` @@ -166,13 +418,9 @@ Worker resource health and pool status. List paused queue names. -### `POST /api/queues/{name}/pause` - -Pause a queue (jobs stop being dequeued). +### `POST /api/queues/{name}/pause` / `POST /api/queues/{name}/resume` -### `POST /api/queues/{name}/resume` - -Resume a paused queue. +Pause or resume a queue. Takes effect immediately. ## Observability @@ -190,45 +438,57 @@ KEDA-compatible autoscaler payload. Pass `?queue=name` for a specific queue. ### `GET /health` -Liveness check. Always returns `{"status": "ok"}`. +Public liveness check. Always returns `{"status": "ok"}`. ### `GET /readiness` -Readiness check with storage, worker, and resource health. +Public readiness check with storage, worker, and resource health. ### `GET /metrics` -Prometheus metrics endpoint (requires `prometheus-client` package). +Public Prometheus metrics endpoint (requires `prometheus-client` package). + +## Settings + +### `GET /api/settings` + +Dump of every dashboard setting key/value. + +### `GET /api/settings/{key}` / `PUT` / `DELETE` + +Read, set, or delete a single dashboard setting. Used by the dashboard +itself for branding, external links, and integration URLs — but you +can write your own keys here too. Note that this is the same store +where authentication, webhook subscriptions, delivery logs, and +runtime overrides live, all under namespaced prefixes (`auth:*`, +`webhooks:*`, `overrides:*`, etc.). ## Using the API programmatically ```python import requests -# Health check script -stats = requests.get("http://localhost:8080/api/stats").json() +s = requests.Session() +s.post( + "http://localhost:8080/api/auth/login", + json={"username": "admin", "password": "..."}, +) +csrf = s.cookies.get("taskito_csrf") +s.headers["X-CSRF-Token"] = csrf +# Health check script. +stats = s.get("http://localhost:8080/api/stats").json() if stats["dead"] > 0: print(f"WARNING: {stats['dead']} dead letter(s)") -if stats["running"] > 100: - print(f"WARNING: {stats['running']} jobs running, possible backlog") -``` - -```python -# Pause a queue during deployment -requests.post("http://localhost:8080/api/queues/default/pause") +# Tune a task's rate limit during an incident. +s.put( + "http://localhost:8080/api/tasks/myapp.tasks.send_email/override", + json={"rate_limit": "30/m"}, +) +# Pause a queue during deployment. +s.post("http://localhost:8080/api/queues/default/pause") # ... deploy ... - -# Resume after deployment -requests.post("http://localhost:8080/api/queues/default/resume") -``` - -```python -# Retry all dead letters -dead = requests.get("http://localhost:8080/api/dead-letters?limit=100").json() -for entry in dead: - requests.post(f"http://localhost:8080/api/dead-letters/{entry['id']}/retry") - print(f"Retried {entry['task_name']}") +s.post("http://localhost:8080/api/queues/default/resume") ``` diff --git a/docs/content/docs/guides/observability/dashboard-auth.mdx b/docs/content/docs/guides/observability/dashboard-auth.mdx new file mode 100644 index 00000000..38237684 --- /dev/null +++ b/docs/content/docs/guides/observability/dashboard-auth.mdx @@ -0,0 +1,158 @@ +--- +title: Dashboard Authentication +description: "Session-based login, CSRF, env-var bootstrap, and the setup-required flow." +--- + +import { Callout } from "fumadocs-ui/components/callout"; +import { Tab, Tabs } from "fumadocs-ui/components/tabs"; + +The taskito dashboard is auth-gated by default. Until the first admin +exists, every protected API route returns `503 setup_required` and the +SPA shows the one-time setup form. Once an admin is registered the +dashboard requires a valid session cookie on every API call and a CSRF +token on every state-changing request. + +## How auth works + +- **Users + sessions** live in the existing `dashboard_settings` + key/value table — no new schema, so SQLite, PostgreSQL, and Redis + backends are supported uniformly. +- **Passwords** are hashed with stdlib `hashlib.pbkdf2_hmac` (SHA-256, + 600,000 iterations, 16-byte random salt — the OWASP 2023+ PBKDF2 + baseline). No third-party crypto dependency. +- **Sessions** are stored server-side under + `auth:session:` with a 24-hour TTL. The token rides in + an `HttpOnly` + `SameSite=Strict` cookie named `taskito_session`. +- **CSRF** uses the double-submit pattern: a non-HttpOnly cookie named + `taskito_csrf` carries a per-session token that the SPA reads and + echoes back via the `X-CSRF-Token` header on POST/PUT/DELETE. The + server rejects any state-changing request whose header doesn't match + both the cookie and the session-bound token. + +## First-run setup + +On a fresh database the dashboard refuses to do anything else until an +admin exists. + +![First-run setup form](/screenshots/dashboard/auth-setup.png) + +The form submits to `POST /api/auth/setup`, which is allowed to run +exactly once — it returns `400 setup already complete` after the first +user is created. The new admin is signed in automatically. + +### Bootstrap via environment variables + +For headless deployments (Docker, Kubernetes, systemd) you usually don't +want to visit a browser just to register the first user. Set both env +vars before starting the dashboard: + +```bash +export TASKITO_DASHBOARD_ADMIN_USER=admin +export TASKITO_DASHBOARD_ADMIN_PASSWORD='change-me-on-first-login' +taskito dashboard --app myapp:queue +``` + +The bootstrap is **idempotent** — once a user with that name exists, +subsequent dashboard restarts read the env vars but skip creation. + + + Rotate the password after first login (use ``POST /api/auth/change-password`` + or the future UI). Leaving the env var in your deployment is fine for + recovery, but anyone with access to the env can re-bootstrap a fresh + install — keep it scoped accordingly. + + +## Sign in + +After setup, every visit routes through the sign-in form. + +![Sign-in form](/screenshots/dashboard/auth-login.png) + +```bash +# Login from the CLI — note the cookie jar so subsequent requests +# carry the session. +curl -c jar -X POST http://localhost:8080/api/auth/login \ + -H 'Content-Type: application/json' \ + -d '{"username":"admin","password":"change-me-on-first-login"}' + +# The CSRF token comes back in a non-HttpOnly cookie. Read it and +# echo it on writes. +CSRF=$(grep taskito_csrf jar | awk '{print $7}') +curl -b jar -H "X-CSRF-Token: $CSRF" -X POST \ + http://localhost:8080/api/queues/default/pause +``` + +The browser SPA does this automatically — the `api-client.ts` wrapper +reads `document.cookie` and attaches the header on POST/PUT/DELETE. + +## API surface + +All routes live under `/api/auth/`: + +| Method | Path | What it does | +|---|---|---| +| `GET` | `/api/auth/status` | Public. Returns `{setup_required: bool}` | +| `POST` | `/api/auth/setup` | Public, locks itself after the first user | +| `POST` | `/api/auth/login` | Returns the user + session and sets cookies | +| `POST` | `/api/auth/logout` | Invalidates the current session, clears cookies | +| `GET` | `/api/auth/whoami` | Returns the current user + CSRF token + expiry | +| `POST` | `/api/auth/change-password` | Requires the current password | + +Every other route under `/api/` is auth-gated. Public exceptions: +`/health`, `/readiness`, `/metrics` (Prometheus), and the static SPA +assets. + +## Headless requests + +The same endpoints work for any HTTP client — Slack bots, +deployment scripts, custom dashboards. The minimal workflow: + +1. `POST /api/auth/login` — save the `Set-Cookie` values +2. For every read: send the `taskito_session` cookie +3. For every write: send the `taskito_session` cookie + `taskito_csrf` + cookie + matching `X-CSRF-Token` header + +```python +import requests + +s = requests.Session() +s.post( + "http://localhost:8080/api/auth/login", + json={"username": "admin", "password": "..."}, +) +csrf = s.cookies.get("taskito_csrf") +s.headers["X-CSRF-Token"] = csrf + +# Reads — no CSRF needed. +stats = s.get("http://localhost:8080/api/stats").json() + +# Writes — CSRF auto-attached via session headers. +s.post("http://localhost:8080/api/queues/default/pause") +``` + +## SSRF guard for outbound URLs + +Webhook URLs entered through the dashboard are vetted before any +delivery happens. By default the server rejects: + +- Non-`http`/`https` schemes +- `localhost`, `*.localhost`, `*.local`, `*.internal`, `*.intranet`, + `*.lan`, `*.private` +- Resolved addresses in any RFC1918 / loopback / link-local / + multicast range (including the AWS metadata service at + `169.254.169.254`) + +Set `TASKITO_WEBHOOKS_ALLOW_PRIVATE=1` to disable the guard for local +development against `http://localhost`. Production should keep the +guard on. + +## Limitations + +- **One role** today (`admin`). Read-only viewers and per-route + permissions are planned; the column already exists on the user + record. +- **No SSO / OIDC** out of the box. Put the dashboard behind a reverse + proxy (oauth2-proxy, Cloudflare Access) if your team uses SSO; the + built-in auth then becomes a fallback for service accounts. +- **Password rotation** has an endpoint but no UI yet — invoke + `POST /api/auth/change-password` directly. diff --git a/docs/content/docs/guides/observability/dashboard.mdx b/docs/content/docs/guides/observability/dashboard.mdx index 192500b2..e19fc825 100644 --- a/docs/content/docs/guides/observability/dashboard.mdx +++ b/docs/content/docs/guides/observability/dashboard.mdx @@ -1,15 +1,18 @@ --- title: Web Dashboard -description: "Zero-dependency built-in web UI for browsing jobs, metrics, workers, and managing the queue." +description: "Zero-dependency built-in web UI for browsing jobs, configuring webhooks, tuning per-task runtime limits, and managing the queue." --- import { Callout } from "fumadocs-ui/components/callout"; import { Tab, Tabs } from "fumadocs-ui/components/tabs"; taskito ships with a built-in web dashboard for monitoring jobs, inspecting -dead letters, and managing your task queue in real time. The dashboard is a -single-page application served directly from the Python package — **zero -extra dependencies required**. +dead letters, configuring webhooks, tuning per-task runtime limits, and +managing your task queue in real time. The dashboard is a single-page +application served directly from the Python package — **zero extra +dependencies required**. + +![Overview page with stats cards, throughput chart, and recent activity](/screenshots/dashboard/overview.png) ## Launching the dashboard @@ -51,8 +54,8 @@ taskito dashboard --app myapp:queue --host 0.0.0.0 --port 9000 ``` - The dashboard reads directly from the same SQLite database as the worker. - You can run them side by side without any coordination: + The dashboard reads directly from the same database as the worker. You + can run them side by side without any coordination: ```bash # Terminal 1 @@ -63,172 +66,128 @@ taskito dashboard --app myapp:queue --host 0.0.0.0 --port 9000 ``` -## Dashboard features - -The dashboard is a React + Vite + TypeScript SPA routed via TanStack Router, -styled with Tailwind v4 and shadcn/ui, and shipped as hash-busted multi-file -assets under `py_src/taskito/static/dashboard/`. - -### Design - -- **Dark and light mode** — Toggle between themes via the sun/moon button in the header. Preference is stored in `localStorage` and follows the system scheme by default. -- **Auto-refresh** — Configurable refresh interval (2s, 5s, 10s, or off) via the header dropdown. All pages auto-refresh at the selected interval; TanStack Query handles caching and background revalidation. -- **Command palette** — `⌘K` / `Ctrl+K` opens a cmdk palette for route navigation and common actions. -- **Icons** — Lucide icons throughout for visual clarity. -- **Toast notifications** — Every action shows a success or error toast via sonner. Optimistic mutations update the UI immediately and roll back on error. -- **Destructive confirms** — Irreversible actions (purge, retry all) use a type-to-confirm dialog. -- **Loading states** — Skeleton screens for tables and cards, error boundaries with retry. -- **Responsive layout** — Sidebar navigation with grouped sections (Monitoring, Infrastructure, Advanced). The main content area scrolls independently. - -### Pages - -| Page | Description | -|---|---| -| **Overview** | Stats cards with status icons, throughput sparkline chart, recent jobs table | -| **Jobs** | Filterable job listing (status, queue, task, metadata, error, date range) with pagination | -| **Job Detail** | Full job info, error history, task logs, replay history, dependency DAG visualization | -| **Metrics** | Per-task performance table (avg, P50, P95, P99) with timeseries chart and time range selector | -| **Logs** | Structured task execution logs with task/level filters | -| **Workers** | Worker cards with heartbeat status, queue assignments, and tags | -| **Queues** | Per-queue stats (pending/running), pause and resume controls | -| **Resources** | Worker DI runtime status — health, scope, init duration, pool stats, dependencies | -| **Circuit Breakers** | Automatic failure protection state (closed/open/half_open), thresholds, cooldowns | -| **Dead Letters** | Failed jobs that exhausted retries — retry individual entries or purge all | -| **System** | Proxy reconstruction and interception strategy metrics | - - - The built SPA ships inside the Python wheel under - `py_src/taskito/static/dashboard/` and is served by the Python dashboard - process. No Node.js, no pnpm, no CDN at runtime — just `pip install - taskito`. Node.js and pnpm are only needed by contributors rebuilding the - dashboard source in `dashboard/`. + + On a fresh database the dashboard refuses every API request with + ``503 setup_required`` until you create the first admin. See + [Authentication](/guides/observability/dashboard-auth) for the full + flow, including the env-var bootstrap path useful for managed + deployments. -## Tutorial - -This walkthrough covers every dashboard page and how to use it. - -### Step 1: start the dashboard - -Start a worker and the dashboard in two terminals: - -```bash -# Terminal 1 — start the worker -taskito worker --app myapp:queue - -# Terminal 2 — start the dashboard -taskito dashboard --app myapp:queue -``` - -You should see: - -``` -taskito dashboard → http://127.0.0.1:8080 -Press Ctrl+C to stop -``` - -Open `http://localhost:8080` in your browser. - -### Step 2: Overview page - -The first page you see is the **Overview**. It shows: - -- **Stats cards** — Six cards at the top showing pending, running, completed, failed, dead, and cancelled job counts. -- **Throughput chart** — A green sparkline showing jobs processed per second over the last 60 refresh intervals. -- **Recent jobs table** — The 10 most recent jobs. Click any row to open its detail view. - -The stats update automatically based on the refresh interval you select in -the header (default: 5 seconds). +## Pages -### Step 3: browsing and filtering jobs +The dashboard is grouped by intent — Monitoring (what's happening), +Infrastructure (where it runs), Reliability (when it goes wrong), and +Configuration (how to change it): -Click **Jobs** in the sidebar. This page shows: - -- **Stats grid** — Same six stat cards as the overview. -- **Filter panel** — Status dropdown, queue, task, metadata, error text, created-after/before pickers. -- **Results table** — Paginated list showing ID, task, queue, status, priority, progress, retries, and created time. - -Use the **Prev / Next** buttons at the bottom to paginate. - -### Step 4: inspecting a job - -Click any job row to open the **Job Detail** page. The detail card shows: - -- A colored top border matching the job status (green for complete, red for failed, etc.) -- Full job ID, status badge, task name, queue, priority, progress bar, retries, timestamps -- **Error** field (if the job failed) displayed in a red-highlighted box -- Unique key and metadata (if set) - -**Actions:** - -- **Cancel Job** — Visible only for pending jobs. Sends a cancel request and shows a toast. -- **Replay** — Re-enqueue the job with the same payload. Navigates to the new job's detail page. - -**Sections below the detail card:** Error History, Task Logs, Replay -History, and a Dependency Graph visualization for jobs with dependencies. - -### Step 5: monitoring metrics +| Group | Page | What it does | +|---|---|---| +| Monitoring | **Overview** | Stats cards, throughput sparkline, queue-by-queue table | +| Monitoring | **Jobs** | Filterable job listing (status, queue, task, metadata, error, date range) | +| Monitoring | **Job Detail** | Full job info, error history, task logs, replay history, dependency DAG | +| Monitoring | **Metrics** | Per-task performance (avg, P50, P95, P99) with timeseries chart | +| Monitoring | **Logs** | Structured task execution logs with task/level filters | +| Infrastructure | **Queues** | Per-queue stats, pause and resume controls | +| Infrastructure | **Workers** | Worker cards with heartbeat status and queue assignments | +| Infrastructure | **Resources** | Worker DI runtime status — health, scope, init duration | +| Reliability | **Dead Letters** | Failed jobs that exhausted retries — retry or purge | +| Reliability | **Circuit Breakers** | Automatic failure protection state, thresholds, cooldowns | +| Reliability | **System** | Proxy reconstruction and interception strategy metrics | +| Configuration | **Tasks** | Decorator defaults + runtime overrides per task ([guide](/guides/observability/task-overrides)) | +| Configuration | **Webhooks** | HTTP event subscriptions with delivery history + replay ([guide](/guides/extensibility/events-webhooks)) | +| Configuration | **Settings** | Dashboard branding, external links, integrations | + +The full REST API surface is documented at +[Dashboard REST API](/guides/observability/dashboard-api). + +## Design + +The dashboard is a React 19 + Vite 8 + TypeScript SPA routed via TanStack +Router, styled with Tailwind v4 and shadcn/ui, and shipped as +hash-busted multi-file assets under `py_src/taskito/static/dashboard/`. + +- **Dark and light mode** — Toggle via the sun/moon button in the header. + Preference is stored in `localStorage` and follows the system scheme by + default. +- **Auto-refresh** — Configurable interval (2s, 5s, 10s, or off) via the + header dropdown. TanStack Query handles caching and background + revalidation. +- **Command palette** — `⌘K` / `Ctrl+K` opens a `cmdk` palette for route + navigation. +- **Toast notifications** — Every action shows a success or error toast. + Optimistic mutations update the UI immediately and roll back on error. +- **Destructive confirms** — Irreversible actions (purge, delete) use a + type-to-confirm dialog. +- **Loading + error states** — Skeleton screens for tables and cards; + error boundaries with retry. -Click **Metrics** in the sidebar. This page shows a time-range selector (1h -/ 6h / 24h), a stacked bar chart of success/failure counts per time bucket, -and a per-task table with avg / P50 / P95 / P99 / min / max latency. + + The built SPA ships inside the Python wheel under + `py_src/taskito/static/dashboard/` and is served by the Python + dashboard process. No Node.js, no pnpm, no CDN at runtime — just + `pip install taskito`. Node.js and pnpm are only needed by + contributors rebuilding the dashboard source. + -### Step 6: viewing logs +## Walkthrough -Click **Logs** in the sidebar. Filter by task name or level. Each log entry -shows time, level badge, task name, job ID, message, and structured extra -data. +### Sign in -### Step 7: workers +On the first visit you'll see the setup form. After you create the +first admin, every subsequent visit shows the sign-in form. -Click **Workers**. Each active worker is displayed as a card showing the -green dot for liveness, worker ID, queues consumed, last heartbeat, -registration time, and tags. +![First-run setup form for the initial admin](/screenshots/dashboard/auth-setup.png) -### Step 8: managing queues +See [Authentication](/guides/observability/dashboard-auth) for the env +var-based bootstrap (`TASKITO_DASHBOARD_ADMIN_USER` / +`TASKITO_DASHBOARD_ADMIN_PASSWORD`) and the CSRF model. -Click **Queues**. Per-queue table with pending/running counts, pause/resume -buttons, and status badges. +### Browse jobs and dig into one - - Pausing a queue prevents the scheduler from dequeuing new jobs from it. - Jobs already running will complete normally. Enqueuing new jobs still - works — they'll be picked up when the queue is resumed. - +The **Jobs** page shows a filterable, paginated table. Filters live in +the sidebar panel: status, queue, task name, metadata search, error +text, date range. Click any row to open the detail view with the full +job state, error history, task logs, replay history, and a dependency +DAG for jobs with relationships. -### Step 9: resources +![Jobs page with filter panel and paginated list](/screenshots/dashboard/jobs.png) -Click **Resources**. Shows registered worker DI runtime entries (name, -scope, health, init duration, recreations, dependencies, pool stats). +### Configure webhooks -### Step 10: circuit breakers +The **Webhooks** page lists every HTTP endpoint subscribed to job +events. Add new endpoints with the **+ New webhook** button. Each row +has a dropdown menu — send a test event, enable/disable, rotate the +signing secret, or view the delivery history. Full guide: +[Events & Webhooks](/guides/extensibility/events-webhooks). -Click **Circuit Breakers**. State badge (closed/open/half_open), failure -count, threshold, window, cooldown. +![Webhooks page with three subscriptions in different states](/screenshots/dashboard/webhooks-list.png) -### Step 11: dead letter queue +### Tune per-task limits -Click **Dead Letters**. Retry individual entries with the **Retry** button, -or purge all with the type-to-confirm **Purge All** in the header. +The **Tasks** page lists every registered task with its decorator +defaults and any active runtime override. Click **Edit** to open a +side sheet with two tabs: **Overrides** (rate limit, concurrency, +retries, timeout, priority, paused) and **Middleware** (toggle each +middleware on or off for the task). Full guide: +[Task & Queue Overrides](/guides/observability/task-overrides). -### Step 12: system internals +![Tasks page with one task overridden in accent](/screenshots/dashboard/tasks-list.png) -Click **System**. Two tables: Proxy Reconstruction (per-handler metrics) -and Interception (per-strategy metrics). +### Manage queues -### Step 13: switching themes +The **Queues** page lists every queue mentioned by a registered task, +showing pending/running counts and the current pause state. Pause and +resume buttons take effect immediately on the running worker. -Click the sun/moon icon in the top-right of the header. +![Queues page with per-queue controls](/screenshots/dashboard/queues.png) -### Step 14: changing refresh rate +### Inspect workers -Use the **Refresh** dropdown in the header — 2s, 5s, 10s, or off. +The **Workers** page lists every registered worker with heartbeat +status, the queues it consumes from, tags, and registration time. Stale +workers (no heartbeat for 30s) automatically transition to "offline". - - The dashboard also exposes a full JSON API. See the - [Dashboard REST API](/guides/observability/dashboard-api) reference - for all endpoints. - +![Workers page showing a single active worker](/screenshots/dashboard/workers.png) ## Development @@ -253,13 +212,23 @@ pnpm run build automatically from the version pinned in `dashboard/package.json`. -The build produces a static `index.html` plus hashed JS/CSS chunks under -`py_src/taskito/static/dashboard/`. The built assets aren't committed — -release tooling runs `pnpm -C dashboard build` before packaging so the -wheel ships them. +The build produces a static `index.html` plus hashed JS/CSS chunks +under `py_src/taskito/static/dashboard/`. The built assets aren't +committed — release tooling runs `pnpm -C dashboard build` before +packaging so the wheel ships them. - - The dashboard does not include authentication. If you expose it beyond - `localhost`, place it behind a reverse proxy with authentication (e.g. - nginx with basic auth, or an OAuth2 proxy). - +### Regenerating screenshots + +Every dashboard screenshot in this documentation is produced by a +reproducible script that seeds a fresh queue, walks the UI in headless +Chrome via Playwright, and writes PNGs into `docs/public/screenshots/dashboard/`: + +```bash +uv sync --extra docs # one-time +uv run python -m playwright install chromium # one-time +uv run python scripts/capture_docs_screenshots.py +``` + +Pass `--skip-capture` to start the seeded demo dashboard in a browser +without running Playwright — useful when iterating on UI changes +locally. diff --git a/docs/content/docs/guides/observability/index.mdx b/docs/content/docs/guides/observability/index.mdx index 9570dd97..c1153632 100644 --- a/docs/content/docs/guides/observability/index.mdx +++ b/docs/content/docs/guides/observability/index.mdx @@ -10,4 +10,6 @@ Monitor, log, and inspect your task queue in real time. | [Monitoring & Hooks](/guides/observability/monitoring) | Queue stats, progress tracking, worker heartbeat, and alerting hooks | | [Structured Logging](/guides/observability/logging) | Per-task structured logs with automatic context | | [Web Dashboard](/guides/observability/dashboard) | Built-in web UI for browsing jobs, metrics, and worker status | +| [Dashboard Authentication](/guides/observability/dashboard-auth) | Setup flow, session cookies, CSRF, env-var bootstrap | +| [Task & Queue Overrides](/guides/observability/task-overrides) | Runtime knobs for retry policy, rate limits, concurrency, and middleware toggles | | [Dashboard REST API](/guides/observability/dashboard-api) | Programmatic access to all dashboard data via REST endpoints | diff --git a/docs/content/docs/guides/observability/meta.json b/docs/content/docs/guides/observability/meta.json index 474b3748..145ef7f2 100644 --- a/docs/content/docs/guides/observability/meta.json +++ b/docs/content/docs/guides/observability/meta.json @@ -1,4 +1,12 @@ { "title": "Observability", - "pages": ["monitoring", "logging", "notes", "dashboard", "dashboard-api"] + "pages": [ + "monitoring", + "logging", + "notes", + "dashboard", + "dashboard-auth", + "task-overrides", + "dashboard-api" + ] } diff --git a/docs/content/docs/guides/observability/task-overrides.mdx b/docs/content/docs/guides/observability/task-overrides.mdx new file mode 100644 index 00000000..562000bd --- /dev/null +++ b/docs/content/docs/guides/observability/task-overrides.mdx @@ -0,0 +1,222 @@ +--- +title: Task & Queue Overrides +description: "Tune retry policy, concurrency, rate limits, and middleware per task from the dashboard — without redeploying." +--- + +import { Callout } from "fumadocs-ui/components/callout"; +import { Tab, Tabs } from "fumadocs-ui/components/tabs"; + +The decorator-declared values on `@queue.task(...)` are *defaults*. The +dashboard lets operators override them at runtime — adjust a rate +limit, pause a misbehaving task, lower the retry budget after an +incident — without redeploying. + +Two surfaces: + +- **Task overrides** — per-task knobs: rate limit, concurrency, + retries, retry backoff, timeout, priority, paused. +- **Queue overrides** — per-queue knobs: rate limit, concurrency, + paused. + +Plus a separate but related toggle for **middleware** on a per-task +basis (see [§ Middleware toggles](#middleware-toggles) below). + +## Tasks page + +The **Tasks** page lists every task registered on the live Queue with +its decorator defaults, any active override, and the *effective* value +(default merged with override). Overridden values render in accent so +"which knobs are pinned" is visible at a glance. + +![Tasks page with one override active](/screenshots/dashboard/tasks-list.png) + +Click **Edit** on any row to open the side sheet. The form mirrors +the decorator kwargs: + +![Override side sheet on the send_email task](/screenshots/dashboard/task-edit-overrides.png) + +- **Empty input** → inherit the decorator default +- **Value entered** → override the default +- **Clear override** → remove the row entirely; task falls back to + every decorator value + +| Field | Decorator equivalent | +|---|---| +| Rate limit | `rate_limit="100/m"` | +| Max concurrent | `max_concurrent=10` | +| Max retries | `max_retries=5` | +| Timeout | `timeout=300` (seconds) | +| Priority | `priority=2` | +| Paused | n/a — runtime-only | + +## When changes take effect + +This is the most important thing to internalize: + +| Change | Takes effect | +|---|---| +| Pausing a task | Next worker restart for the rate-limit/concurrency side effects, **but** the paused flag is plumbed through the live `paused_queues` mechanism so the scheduler stops dequeuing immediately for queue-level pauses | +| Pausing a queue | **Immediately** on running workers (writes to `paused_queues`) | +| Rate limit / max concurrent / retries / timeout / priority on a task | **Next worker restart** — the values are baked into `PyTaskConfig` at `run_worker` time and passed to the Rust scheduler | +| Rate limit / max concurrent on a queue | **Next worker restart** — same mechanism, merged into `queue_configs` JSON sent to Rust | +| Middleware on/off per task | **Next job** — middleware lookup happens at every task invocation | + +This split is intentional. Pause is a fast-path safety valve; +retry/rate-limit changes need scheduler buy-in and are deliberately +"restart to apply" so operators have a clear mental model of when the +new values take over. + + + Pulling rate-limit / retries / timeout into the Rust scheduler's + per-poll lookup would let those changes hot-reload too. The + ``PyTaskConfig`` → scheduler path would gain a cache-invalidation + counter (incremented on every override write) the poller checks + before each admission cycle. Until then, restart the worker to apply + changes to those knobs. + + +## Programmatic API + +The dashboard CRUD is a thin shell over the `Queue` API — you can +script overrides the same way: + +```python +from taskito import Queue + +queue = Queue(db_path="tasks.db") + +# Tasks +queue.set_task_override( + "myapp.tasks.send_email", + rate_limit="200/m", + max_retries=10, +) +queue.set_task_override("myapp.tasks.send_email", paused=True) # immediate-ish +queue.clear_task_override("myapp.tasks.send_email") + +# Queues +queue.set_queue_override("email", max_concurrent=5) +queue.set_queue_override("email", paused=True) # immediate +queue.clear_queue_override("email") + +# Discovery — what's registered + what's overridden +for entry in queue.registered_tasks(): + print(entry["name"], entry["effective"]) + +for entry in queue.registered_queues(): + print(entry["name"], entry["effective"]) +``` + +Allowed task override fields: `rate_limit`, `max_concurrent`, +`max_retries`, `retry_backoff`, `timeout`, `priority`, `paused`. + +Allowed queue override fields: `rate_limit`, `max_concurrent`, +`paused`. + +The store validates types and ranges before persisting — a typo (or a +typed-in `-1`) raises `ValueError` rather than writing garbage. The +dashboard handler surfaces the same errors as `400 Bad Request`. + +## Storage + +Overrides live as JSON entries under +`overrides:task:` and `overrides:queue:` keys +in the `dashboard_settings` table. SQLite, PostgreSQL, and Redis +backends all support them uniformly — no new schema. The encoded +JSON only includes fields the operator actually set, so removing a +field by passing `None` shrinks the row rather than leaving stale +data. + +## Middleware toggles + +Middleware are normally global (via `Queue(middleware=[...])`) or +per-task (via `@queue.task(middleware=[...])`). The dashboard adds a +third axis: **temporarily disable a middleware for one task** without +touching code. Useful when: + +- A logging middleware is generating noise for one chatty task +- A retry-policy middleware is interfering with a specific debug job +- You want to A/B compare runs with and without a middleware + +### Toggle from the dashboard + +Open the same side sheet as for overrides and switch to the +**Middleware** tab. Each registered middleware shows up as a pill +button — green for enabled, grey for disabled. + +![Middleware tab with one toggle disabled](/screenshots/dashboard/task-edit-middleware.png) + +Changes take effect on the **next job** — no worker restart required. +The middleware lookup runs at every task invocation, so the next time +the task is dequeued the new chain applies. + +### Naming middleware + +Every `TaskMiddleware` carries a stable `name` attribute that the +disable list keys on. By default the name is the fully-qualified class +path (e.g. `myapp.middleware.LoggingMiddleware`) so it survives +restarts. Override it to pin a shorter, user-facing name: + +```python +from taskito.middleware import TaskMiddleware + +class SentryMiddleware(TaskMiddleware): + name = "sentry" # shows up as "sentry" in the dashboard + + def before(self, ctx): + ... +``` + +The dashboard rejects toggles for unknown middleware names (`404`), so +typos can't silently write no-op disables. + +### Programmatic API + +```python +queue.list_middleware() # [{name, class_path, scopes}, ...] +queue.disable_middleware_for_task("myapp.tasks.send_email", "demo.metrics") +queue.enable_middleware_for_task("myapp.tasks.send_email", "demo.metrics") +queue.clear_middleware_disables("myapp.tasks.send_email") +queue.get_disabled_middleware_for("myapp.tasks.send_email") # ["demo.metrics"] +``` + +## Examples + +### Pause one task without redeploying + +A flaky third-party API is rate-limiting your `send_email` task and +you want to stop new sends while you investigate: + +```python +queue.set_task_override("myapp.tasks.send_email", paused=True) +# ... or from the dashboard: Tasks → send_email → Edit → check "Pause this task" +``` + +Existing in-flight jobs finish normally; nothing new dequeues until +you clear the override. + +### Lower a rate limit during an incident + +Cut `send_email` from 200/m to 30/m while a downstream is recovering: + +```python +queue.set_task_override("myapp.tasks.send_email", rate_limit="30/m") +# Restart the workers for the change to take effect on the scheduler. +``` + +### Disable a heavyweight middleware for one task + +A debug middleware is dumping payloads for every invocation, and you +want to keep it on for everything except your high-volume +`process_image` task: + +```python +queue.disable_middleware_for_task("myapp.tasks.process_image", "debug.payload") +# Takes effect on the next process_image job, no restart needed. +``` + +## Reference + +- [Dashboard REST API: Tasks & overrides](/guides/observability/dashboard-api#tasks-and-overrides) +- [Dashboard REST API: Middleware](/guides/observability/dashboard-api#middleware) +- [Tasks decorator reference](/api-reference/task) diff --git a/docs/public/screenshots/dashboard/auth-login.png b/docs/public/screenshots/dashboard/auth-login.png new file mode 100644 index 00000000..fbcfe56d Binary files /dev/null and b/docs/public/screenshots/dashboard/auth-login.png differ diff --git a/docs/public/screenshots/dashboard/auth-setup.png b/docs/public/screenshots/dashboard/auth-setup.png new file mode 100644 index 00000000..98c08268 Binary files /dev/null and b/docs/public/screenshots/dashboard/auth-setup.png differ diff --git a/docs/public/screenshots/dashboard/jobs.png b/docs/public/screenshots/dashboard/jobs.png new file mode 100644 index 00000000..e6b9bf36 Binary files /dev/null and b/docs/public/screenshots/dashboard/jobs.png differ diff --git a/docs/public/screenshots/dashboard/overview.png b/docs/public/screenshots/dashboard/overview.png new file mode 100644 index 00000000..ef6e3a38 Binary files /dev/null and b/docs/public/screenshots/dashboard/overview.png differ diff --git a/docs/public/screenshots/dashboard/queues.png b/docs/public/screenshots/dashboard/queues.png new file mode 100644 index 00000000..8e797616 Binary files /dev/null and b/docs/public/screenshots/dashboard/queues.png differ diff --git a/docs/public/screenshots/dashboard/task-edit-middleware.png b/docs/public/screenshots/dashboard/task-edit-middleware.png new file mode 100644 index 00000000..c8498b63 Binary files /dev/null and b/docs/public/screenshots/dashboard/task-edit-middleware.png differ diff --git a/docs/public/screenshots/dashboard/task-edit-overrides.png b/docs/public/screenshots/dashboard/task-edit-overrides.png new file mode 100644 index 00000000..282c768b Binary files /dev/null and b/docs/public/screenshots/dashboard/task-edit-overrides.png differ diff --git a/docs/public/screenshots/dashboard/tasks-list.png b/docs/public/screenshots/dashboard/tasks-list.png new file mode 100644 index 00000000..6b9ad85f Binary files /dev/null and b/docs/public/screenshots/dashboard/tasks-list.png differ diff --git a/docs/public/screenshots/dashboard/webhook-create-dialog.png b/docs/public/screenshots/dashboard/webhook-create-dialog.png new file mode 100644 index 00000000..4ddc6157 Binary files /dev/null and b/docs/public/screenshots/dashboard/webhook-create-dialog.png differ diff --git a/docs/public/screenshots/dashboard/webhook-deliveries.png b/docs/public/screenshots/dashboard/webhook-deliveries.png new file mode 100644 index 00000000..3a1a318b Binary files /dev/null and b/docs/public/screenshots/dashboard/webhook-deliveries.png differ diff --git a/docs/public/screenshots/dashboard/webhooks-list.png b/docs/public/screenshots/dashboard/webhooks-list.png new file mode 100644 index 00000000..af342e65 Binary files /dev/null and b/docs/public/screenshots/dashboard/webhooks-list.png differ diff --git a/docs/public/screenshots/dashboard/workers.png b/docs/public/screenshots/dashboard/workers.png new file mode 100644 index 00000000..0535e6e1 Binary files /dev/null and b/docs/public/screenshots/dashboard/workers.png differ diff --git a/py_src/taskito/dashboard/server.py b/py_src/taskito/dashboard/server.py index 2aa17539..bdb56eda 100644 --- a/py_src/taskito/dashboard/server.py +++ b/py_src/taskito/dashboard/server.py @@ -14,7 +14,7 @@ import logging from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from typing import TYPE_CHECKING, Any -from urllib.parse import parse_qs, urlparse +from urllib.parse import parse_qs, unquote, urlparse from taskito.dashboard.auth import ( DEFAULT_SESSION_TTL_SECONDS, @@ -181,17 +181,17 @@ def _handle_get(self) -> None: for pattern, param_handler in GET_PARAM_ROUTES: m = pattern.match(path) if m: - self._dispatch_with_handler( - param_handler, lambda h, m=m: h(queue, qs, m.group(1)) - ) + g1 = unquote(m.group(1)) + self._dispatch_with_handler(param_handler, lambda h, g1=g1: h(queue, qs, g1)) return for pattern, param_handler in GET_PARAM2_ROUTES: m = pattern.match(path) if m: + g1, g2 = unquote(m.group(1)), unquote(m.group(2)) self._dispatch_with_handler( param_handler, - lambda h, m=m: h(queue, qs, (m.group(1), m.group(2))), + lambda h, g1=g1, g2=g2: h(queue, qs, (g1, g2)), ) return @@ -263,15 +263,17 @@ def _handle_post(self) -> None: for pattern, param_handler in POST_PARAM_ROUTES: m = pattern.match(path) if m: - self._dispatch_with_handler(param_handler, lambda h, m=m: h(queue, m.group(1))) + g1 = unquote(m.group(1)) + self._dispatch_with_handler(param_handler, lambda h, g1=g1: h(queue, g1)) return for pattern, param_handler in POST_PARAM2_ROUTES: m = pattern.match(path) if m: + g1, g2 = unquote(m.group(1)), unquote(m.group(2)) self._dispatch_with_handler( param_handler, - lambda h, m=m: h(queue, (m.group(1), m.group(2))), + lambda h, g1=g1, g2=g2: h(queue, (g1, g2)), ) return @@ -289,8 +291,9 @@ def _handle_put(self) -> None: body = self._read_json_body() if body is None: return + g1 = unquote(m.group(1)) self._dispatch_with_handler( - param_handler, lambda h, m=m, body=body: h(queue, body, m.group(1)) + param_handler, lambda h, g1=g1, body=body: h(queue, body, g1) ) return for pattern, param_handler in PUT_PARAM2_ROUTES: @@ -299,9 +302,10 @@ def _handle_put(self) -> None: body = self._read_json_body() if body is None: return + g1, g2 = unquote(m.group(1)), unquote(m.group(2)) self._dispatch_with_handler( param_handler, - lambda h, m=m, body=body: h(queue, body, (m.group(1), m.group(2))), + lambda h, g1=g1, g2=g2, body=body: h(queue, body, (g1, g2)), ) return self._json_response({"error": "Not found"}, status=404) @@ -315,7 +319,8 @@ def _handle_delete(self) -> None: for pattern, param_handler in DELETE_PARAM_ROUTES: m = pattern.match(path) if m: - self._dispatch_with_handler(param_handler, lambda h, m=m: h(queue, m.group(1))) + g1 = unquote(m.group(1)) + self._dispatch_with_handler(param_handler, lambda h, g1=g1: h(queue, g1)) return self._json_response({"error": "Not found"}, status=404) diff --git a/pyproject.toml b/pyproject.toml index 9fdfdddf..29509ccf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,7 @@ encryption = ["cryptography"] flask = ["flask>=3.0"] aws = ["boto3>=1.34"] gcs = ["google-cloud-storage>=2.10"] +docs = ["playwright>=1.59"] [tool.maturin] manifest-path = "crates/taskito-python/Cargo.toml" @@ -148,3 +149,4 @@ ignore_missing_imports = true [[tool.mypy.overrides]] module = "click" ignore_missing_imports = true + diff --git a/scripts/capture_docs_screenshots.py b/scripts/capture_docs_screenshots.py new file mode 100644 index 00000000..8f4c3cc3 --- /dev/null +++ b/scripts/capture_docs_screenshots.py @@ -0,0 +1,482 @@ +#!/usr/bin/env python3 +"""Reproducible screenshot capture for the documentation site. + +Spins up a fresh Taskito Queue, seeds it with deterministic demo data +(admin user, sample tasks/queues, webhooks with mixed delivery outcomes, +runtime overrides, a middleware disable), starts the dashboard on a +random port, drives a headless Chromium through every screen, and saves +PNGs under ``docs/public/screenshots/dashboard/``. + +Run from the repo root: + + uv run --with playwright python scripts/capture_docs_screenshots.py + # First time only: + uv run --with playwright python -m playwright install chromium + +The script is **idempotent**: every run overwrites the previous PNGs and +starts from an empty SQLite DB in a temp directory, so there is no +"works on my machine" drift. +""" + +from __future__ import annotations + +import argparse +import contextlib +import json +import socket +import sys +import tempfile +import threading +import time +from http.server import BaseHTTPRequestHandler, HTTPServer, ThreadingHTTPServer +from pathlib import Path +from typing import Any + +from taskito import Queue +from taskito.dashboard import _make_handler +from taskito.dashboard.auth import AuthStore +from taskito.dashboard.delivery_store import DeliveryStore +from taskito.events import EventType +from taskito.middleware import TaskMiddleware + +ADMIN_USER = "demo-admin" +ADMIN_PASSWORD = "demo-pass-1234" + +REPO_ROOT = Path(__file__).resolve().parent.parent +SCREENSHOT_DIR = REPO_ROOT / "docs" / "public" / "screenshots" / "dashboard" + + +# ── Demo middleware ────────────────────────────────────────────────── + + +class LoggingMiddleware(TaskMiddleware): + """Demo middleware that the screenshots reference.""" + + name = "demo.logging" + + +class MetricsMiddleware(TaskMiddleware): + name = "demo.metrics" + + +# ── Seed data ──────────────────────────────────────────────────────── + + +# Task body functions are defined at module level so their qualnames stay +# clean (``demo_tasks.send_email`` rather than +# ``capture_docs_screenshots.seed_queue..send_email``) — much nicer +# in screenshots. ``seed_queue`` registers them with the live Queue. + + +def send_email(to: str) -> str: + return f"sent:{to}" + + +def deliver_message(message: str) -> str: + return message + + +def sync_metrics() -> None: + pass + + +send_email.__module__ = "myapp.tasks" +send_email.__qualname__ = "send_email" +deliver_message.__module__ = "myapp.tasks" +deliver_message.__qualname__ = "deliver_message" +sync_metrics.__module__ = "myapp.tasks" +sync_metrics.__qualname__ = "sync_metrics" + + +def seed_queue(queue: Queue) -> None: + """Populate the demo queue with realistic data for the screenshots. + + Returns nothing — the dashboard reads everything from storage. + """ + # Admin user — header drop-down shows "demo-admin" in the screenshots. + AuthStore(queue).create_user(ADMIN_USER, ADMIN_PASSWORD, role="admin") + + # Tasks — defaults vary so the Tasks table has visual variety. + queue.task()(send_email) + queue.task( + queue="email", + max_retries=5, + timeout=120, + rate_limit="100/m", + max_concurrent=10, + )(deliver_message) + queue.task(queue="metrics", priority=2)(sync_metrics) + + # Queue-level configuration. set_queue_concurrency goes through the + # same code path as the dashboard override apply. + queue.set_queue_concurrency("email", 10) + + # Override the send_email task — Tasks page should show this in accent. + queue.set_task_override( + next(c.name for c in queue._task_configs if c.name.endswith("send_email")), + rate_limit="200/m", + max_retries=10, + ) + + # Webhook subscriptions — one fully configured, one disabled, one + # filtered to a specific task. + import os + + os.environ["TASKITO_WEBHOOKS_ALLOW_PRIVATE"] = "1" # echo server is loopback + + sub1 = queue.add_webhook( + url="https://hooks.example.com/ops-failures", + events=[EventType.JOB_FAILED, EventType.JOB_DEAD], + secret="whsec_demo_signing_secret", + description="Page ops on permanent job failures", + max_retries=5, + timeout=8.0, + ) + # Second subscription: filters by task name. Captured for visual + # contrast in the webhooks-list screenshot; we don't reuse its id. + queue.add_webhook( + url="https://audit.internal.example.com/taskito-events", + events=None, + task_filter=["myapp.tasks.send_email"], + description="Audit log for send_email only", + ) + sub3 = queue.add_webhook( + url="https://staging-hooks.example.com/all-events", + description="Staging echo — disabled", + ) + queue.update_webhook(sub3.id, enabled=False) + + # Synthesize delivery history for sub1 so the Deliveries page has rows. + store = DeliveryStore(queue) + base_time = int(time.time() * 1000) + deliveries = [ + ("delivered", 200, 42, "job.completed", "myapp.tasks.process_image"), + ("delivered", 200, 38, "job.completed", "myapp.tasks.send_email"), + ("delivered", 200, 51, "job.completed", "myapp.tasks.send_email"), + ("failed", 504, 9500, "job.failed", "myapp.tasks.process_image"), + ("delivered", 200, 44, "job.completed", "myapp.tasks.send_email"), + ("dead", 500, 30000, "job.dead", "myapp.tasks.process_image"), + ("delivered", 200, 39, "job.completed", "myapp.tasks.send_email"), + ] + for i, (status, code, lat, event, task_name) in enumerate(deliveries): + record = store.record_attempt( + sub1.id, + event=event, + payload={ + "task_name": task_name, + "job_id": f"01H{i:02d}DEMOXYZ{i}", + "queue": "default", + }, + status=status, + attempts=3 if status == "dead" else 1, + response_code=code if status != "delivered" or code == 200 else None, + latency_ms=lat, + response_body=( + None + if status == "delivered" + else "Internal Server Error\nstack trace here..." + ), + task_name=task_name, + job_id=f"01H{i:02d}DEMOXYZ{i}", + ) + # Backdate so the "When" column shows a range of relative times. + _backdate_delivery(queue, sub1.id, record.id, base_time - i * 600_000) + + # Disable one middleware on one task so the Middleware tab has a + # mix of green / grey toggles. + send_email_full = next( + c.name for c in queue._task_configs if c.name.endswith("send_email") + ) + queue.disable_middleware_for_task(send_email_full, "demo.metrics") + + +def _backdate_delivery(queue: Queue, sub_id: str, record_id: str, ts: int) -> None: + """Rewrite a delivery's ``created_at`` so the deliveries table shows + a believable range of relative times in the screenshot rather than + a clump of "just now" rows.""" + key = f"webhooks:deliveries:{sub_id}" + raw = queue.get_setting(key) + if not raw: + return + rows = json.loads(raw) + for row in rows: + if row.get("id") == record_id: + row["created_at"] = ts + if row.get("completed_at") is not None: + row["completed_at"] = ts + 50 + queue.set_setting(key, json.dumps(rows, separators=(",", ":"))) + return + + +# ── Webhook echo server (for the live send-test screenshot) ────────── + + +def start_echo_server() -> tuple[str, HTTPServer]: + """Local server the test webhook delivers to during the captures.""" + + class Handler(BaseHTTPRequestHandler): + def do_POST(self) -> None: + self.send_response(200) + self.end_headers() + self.wfile.write(b"ok") + + def log_message(self, *args: Any) -> None: + pass + + server = HTTPServer(("127.0.0.1", 0), Handler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + return f"http://127.0.0.1:{server.server_address[1]}", server + + +# ── Dashboard process ──────────────────────────────────────────────── + + +def start_dashboard(queue: Queue) -> tuple[str, ThreadingHTTPServer]: + """Start the dashboard on a random localhost port.""" + handler = _make_handler(queue) + # Bind to port 0 → kernel picks a free port. + server = ThreadingHTTPServer(("127.0.0.1", 0), handler) + threading.Thread(target=server.serve_forever, daemon=True).start() + base = f"http://127.0.0.1:{server.server_address[1]}" + _wait_for_port(server.server_address[1]) + return base, server + + +def _wait_for_port(port: int, timeout: float = 5.0) -> None: + deadline = time.time() + timeout + while time.time() < deadline: + with socket.socket() as sock: + try: + sock.settimeout(0.2) + sock.connect(("127.0.0.1", port)) + return + except OSError: + time.sleep(0.05) + raise RuntimeError(f"dashboard did not bind on port {port}") + + +# ── Capture flow ───────────────────────────────────────────────────── + + +def capture_all(base_url: str) -> None: + """Walk every dashboard page and save a PNG per screen. + + Uses Playwright's sync API. Each screenshot is named for its target + location in the docs so the MDX side can reference them by stable path. + """ + from playwright.sync_api import sync_playwright + + SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True) + print(f"Capturing screenshots into {SCREENSHOT_DIR}") + + with sync_playwright() as p: + # Use the system Chrome so no extra browser download is needed. + browser = p.chromium.launch(headless=True, channel="chrome") + # Width 1280 matches the dashboard's ``max-w-[1400px]`` content + # area; height 800 keeps each capture above-the-fold without huge + # screenshots. ``deviceScaleFactor=2`` gives crisp HiDPI output. + context = browser.new_context( + viewport={"width": 1280, "height": 800}, + device_scale_factor=2, + ) + page = context.new_page() + + # ── Phase 1: auth ───────────────────────────────────────── + login_and_screenshot_setup_then_login(page, base_url) + + # Pre-fetch the cookie for the rest of the flow — the dashboard + # uses session cookies, and after the login flow above we already + # have them. So the page context is now authenticated. + + # ── Main pages ─────────────────────────────────────────── + capture_each( + page, + base_url, + [ + ("/", "overview"), + ("/jobs", "jobs"), + ("/queues", "queues"), + ("/workers", "workers"), + ], + wait_for_text={"/": "Overview", "/jobs": "Jobs", "/queues": "Queues"}, + ) + + # ── Phase 2/3: webhooks ────────────────────────────────── + capture_page(page, f"{base_url}/webhooks", "webhooks-list") + # Drive the deliveries view via the visible UI — same path an + # operator would take. + page.locator('button[aria-label="Webhook actions"]').first.click() + page.get_by_role("menuitem", name="View deliveries").click() + page.wait_for_url("**/deliveries", timeout=5000) + page.wait_for_load_state("networkidle") + time.sleep(1.2) + screenshot(page, "webhook-deliveries") + + # Open the create-webhook dialog for the form screenshot. + page.goto(f"{base_url}/webhooks", wait_until="networkidle") + page.get_by_role("button", name="New webhook").click() + page.wait_for_selector("text=Subscribe an HTTP endpoint", timeout=3000) + time.sleep(0.3) # let the dialog finish animating in + screenshot(page, "webhook-create-dialog") + + # ── Phase 4/5: tasks + middleware ──────────────────────── + capture_page(page, f"{base_url}/tasks", "tasks-list") + + page.goto(f"{base_url}/tasks", wait_until="networkidle") + # First Edit button opens the side sheet. + page.get_by_role("button", name="Edit").first.click() + page.wait_for_selector("text=Overrides", timeout=3000) + time.sleep(0.3) + screenshot(page, "task-edit-overrides") + + # Switch to the Middleware tab inside the same sheet. + page.get_by_role("tab", name="Middleware").click() + page.wait_for_selector("text=demo.logging", timeout=3000) + time.sleep(0.3) + screenshot(page, "task-edit-middleware") + + context.close() + browser.close() + print(f"OK — captured {len(list(SCREENSHOT_DIR.glob('*.png')))} screenshots") + + +def login_and_screenshot_setup_then_login(page: Any, base_url: str) -> None: + """Capture the setup page on a fresh dashboard, then the login page, + then sign in so subsequent captures are authenticated.""" + # Use a *separate* throwaway DB just for the setup screenshot — the + # main demo queue already has a user, so /login would show the sign-in + # form. Easier than tearing down and re-seeding mid-run. + setup_url = _start_throwaway_dashboard() + page.goto(setup_url + "/login", wait_until="networkidle") + page.wait_for_selector("text=Create the first admin", timeout=3000) + time.sleep(0.3) + screenshot(page, "auth-setup") + + # Now the real login page on the seeded dashboard. + page.goto(base_url + "/login", wait_until="networkidle") + page.wait_for_selector("text=Sign in", timeout=3000) + time.sleep(0.3) + screenshot(page, "auth-login") + + # Authenticate so the rest of the captures run inside the AppShell. + page.fill('input[id="login-username"]', ADMIN_USER) + page.fill('input[id="login-password"]', ADMIN_PASSWORD) + page.get_by_role("button", name="Sign in").click() + page.wait_for_url(f"{base_url}/", timeout=5000) + + +def _start_throwaway_dashboard() -> str: + """Spin up a second dashboard against a fresh empty DB just for the + setup screenshot.""" + tmpdir = tempfile.mkdtemp(prefix="taskito-docs-") + q = Queue(db_path=f"{tmpdir}/setup.db") + url, _server = start_dashboard(q) + return url + + +def capture_each( + page: Any, + base_url: str, + routes: list[tuple[str, str]], + *, + wait_for_text: dict[str, str] | None = None, +) -> None: + for route, name in routes: + url = base_url + route + page.goto(url, wait_until="networkidle") + expected = (wait_for_text or {}).get(route) + if expected is not None: + with contextlib.suppress(Exception): + page.wait_for_selector(f"text={expected}", timeout=3000) + time.sleep(0.3) + screenshot(page, name) + + +def capture_page(page: Any, url: str, name: str) -> None: + page.goto(url, wait_until="networkidle") + time.sleep(0.4) + screenshot(page, name) + + +def screenshot(page: Any, name: str) -> None: + out = SCREENSHOT_DIR / f"{name}.png" + page.screenshot(path=str(out), full_page=False) + print(f" • {out.name}") + + +def _first_webhook_id(base_url: str) -> str: + """Pull the demo webhook id straight from the API so the deep link + in the screenshot script stays in sync with whatever seed_queue + produced.""" + # NB: the dashboard is auth-gated, so we need the cookie. Simplest + # approach is a synchronous login via stdlib urllib. + import urllib.parse + + login = json.dumps({"username": ADMIN_USER, "password": ADMIN_PASSWORD}).encode() + req = urllib.request.Request( + f"{base_url}/api/auth/login", + method="POST", + data=login, + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req) as resp: + cookies = "; ".join( + urllib.parse.unquote(c.split(";", 1)[0]) + for c in resp.headers.get_all("Set-Cookie") or [] + ) + list_req = urllib.request.Request( + f"{base_url}/api/webhooks", headers={"Cookie": cookies} + ) + with urllib.request.urlopen(list_req) as resp: + items = json.loads(resp.read()) + return str(items[0]["id"]) + + +# ── Entry point ────────────────────────────────────────────────────── + + +def main(argv: list[str]) -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--skip-capture", + action="store_true", + help="Seed the demo queue and start the dashboard but skip the " + "Playwright run — useful for poking the seeded data in a browser.", + ) + args = parser.parse_args(argv) + + tmpdir = tempfile.mkdtemp(prefix="taskito-docs-") + print(f"Demo DB: {tmpdir}/demo.db") + queue = Queue( + db_path=f"{tmpdir}/demo.db", + middleware=[LoggingMiddleware(), MetricsMiddleware()], + ) + seed_queue(queue) + + echo_url, _echo = start_echo_server() + print(f"Echo server: {echo_url}") + + base_url, _dash = start_dashboard(queue) + print(f"Dashboard: {base_url}") + + if args.skip_capture: + print("\nSkipping Playwright. Open the dashboard in a browser. Ctrl+C to exit.") + try: + threading.Event().wait() + except KeyboardInterrupt: + return 0 + return 0 + + try: + capture_all(base_url) + except Exception: + import traceback + + traceback.print_exc() + return 1 + return 0 + + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) diff --git a/scripts/optimize_screenshots.py b/scripts/optimize_screenshots.py new file mode 100644 index 00000000..23104771 --- /dev/null +++ b/scripts/optimize_screenshots.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 +"""Crunch the dashboard screenshots so the docs site stays light. + +The capture script writes 2x HiDPI PNGs straight from Playwright, which +range from 50 KB to 350 KB each. This pass: + +1. Converts to ``P`` mode (256-colour palette) where appropriate — + screenshots are dominated by flat UI panels and large solid regions, + so palette quantisation typically halves the file size with no + visible loss. +2. Falls back to the original RGBA encoding if quantisation actually + *grows* the file (rare on dense screenshots). +3. Reports before/after sizes so we can spot regressions. + +Run after every screenshot regen: + + uv run --with pillow python scripts/optimize_screenshots.py +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +from PIL import Image + +REPO_ROOT = Path(__file__).resolve().parent.parent +SCREENSHOT_DIR = REPO_ROOT / "docs" / "public" / "screenshots" / "dashboard" + + +def optimize(path: Path) -> tuple[int, int]: + """Return ``(before_bytes, after_bytes)``.""" + before = path.stat().st_size + with Image.open(path) as img: + img.load() + # Quantise to a 256-colour palette. ``method=2`` (median cut) is + # better for synthetic UI screenshots than the default libimagequant + # path Pillow uses on lossier paths. + quantised = img.convert("RGB").quantize(colors=256, method=2, dither=Image.Dither.NONE) + tmp = path.with_suffix(".opt.png") + quantised.save(tmp, format="PNG", optimize=True) + after = tmp.stat().st_size + if after < before: + tmp.replace(path) + return before, after + tmp.unlink() + return before, before + + +def main() -> int: + if not SCREENSHOT_DIR.exists(): + print(f"No screenshots at {SCREENSHOT_DIR}", file=sys.stderr) + return 1 + total_before = 0 + total_after = 0 + for png in sorted(SCREENSHOT_DIR.glob("*.png")): + before, after = optimize(png) + total_before += before + total_after += after + delta = (after - before) / before * 100 if before else 0.0 + print( + f"{png.name:35s} {before / 1024:7.1f} KB → {after / 1024:7.1f} KB" + f" ({delta:+5.1f}%)" + ) + print( + f"{'TOTAL':35s} {total_before / 1024:7.1f} KB → {total_after / 1024:7.1f} KB" + f" ({(total_after - total_before) / total_before * 100:+5.1f}%)" + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/dashboard/test_middleware_toggles.py b/tests/dashboard/test_middleware_toggles.py index 6713f098..cc93159e 100644 --- a/tests/dashboard/test_middleware_toggles.py +++ b/tests/dashboard/test_middleware_toggles.py @@ -196,6 +196,28 @@ def test_put_task_middleware_rejects_bad_body( assert exc_info.value.code == 400 +def test_put_task_middleware_handles_url_encoded_name( + dashboard: tuple[AuthedClient, Queue], +) -> None: + """Browser clients ``encodeURIComponent`` task names containing + ``<``, ``>``, ``/`` etc. The server has to decode the captured + group before looking up the disable list — otherwise the toggle + silently no-ops because the disable is keyed by one name but the + chain lookup uses another. Regression test for that path.""" + import urllib.parse + + client, queue = dashboard + name = next(c.name for c in queue._task_configs if c.name.endswith("alpha")) + encoded = urllib.parse.quote(name, safe="") + assert "%" in encoded, "test setup: pick a task whose qualname needs encoding" + result = client.put(f"/api/tasks/{encoded}/middleware/test.other", {"enabled": False}) + assert "test.other" in result["disabled"] + # The chain lookup uses the decoded name, so it must reflect the + # disable that was written by the encoded URL. + chain_names = {mw.name for mw in queue._get_middleware_chain(name)} + assert "test.other" not in chain_names + + def test_delete_task_middleware_clears_all( dashboard: tuple[AuthedClient, Queue], ) -> None: