Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions docs-next/content/docs/architecture/failure-model.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
---
title: Failure Model
description: "At-least-once delivery semantics and how taskito recovers from crashes, timeouts, and partial writes."
---

import { Callout } from "fumadocs-ui/components/callout";

Taskito provides **at-least-once delivery**. Here's what happens when things go wrong.

## Worker crash mid-task

The job stays in `running` status. The scheduler's stale reaper detects it after
`timeout_ms` elapses, marks it failed, and retries (if retries remain) or moves to the
dead letter queue. No manual intervention needed.

## Parent process crash

All worker threads stop. Jobs in `running` stay in that state until the next worker
starts, when the stale reaper picks them up. Jobs in `pending` are unaffected — they'll
be dispatched normally on restart.

## Database unavailable

Scheduler polls fail silently (logged via `log::error!`). No new jobs are dispatched.
In-flight jobs complete normally — results are cached in memory until the database
becomes available.

## Network partition (Postgres/Redis)

Same behavior as database unavailable. The scheduler retries on the next poll cycle
(default: every 50ms). Connection pools handle reconnection automatically.

## Duplicate execution

`claim_execution` prevents two workers from picking up the same job simultaneously.
But if a worker crashes *after* starting execution, the job will be retried —
potentially executing the same task twice. Design tasks to be
[idempotent](/docs/guides/reliability) to handle this safely.

## Recovery timeline

<Mermaid
chart={`sequenceDiagram
participant C as Client
participant DB as Database
participant S as Scheduler
participant W as Worker

C->>DB: enqueue(job)
S->>DB: dequeue + claim_execution
S->>W: dispatch job
W->>W: execute task...
Note over W: Worker crashes at T=5s
Note over S: Scheduler continues polling...
Note over S: T=300s: reap_stale_jobs() detects<br/>job.started_at + timeout_ms < now
S->>DB: mark failed, schedule retry
S->>DB: dequeue (same job, retry_count=1)
S->>W: dispatch to different worker
W->>DB: complete + clear claim`}
/>

## Partial writes

If a task completes successfully but the result write to the database fails (e.g.,
database full, connection lost), the job stays in `running` status. The stale reaper
eventually marks it failed and retries it. The task will execute again — make sure
it's [idempotent](/docs/guides/reliability).

## Jobs without timeouts

<Callout type="warn">
If a job has no `timeout_ms` set and the worker crashes, the job stays in
`running` **forever**. The stale reaper only detects jobs that have exceeded their
timeout. Always set a timeout on production tasks.
</Callout>
32 changes: 32 additions & 0 deletions docs-next/content/docs/architecture/job-lifecycle.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
title: Job Lifecycle
description: "Every job moves through a state machine from creation to completion (or death)."
---

Every job moves through a state machine from creation to completion (or death).

<Mermaid
chart={`stateDiagram-v2
[*] --> Pending: enqueue() / delay()
Pending --> Running: dequeued by scheduler
Pending --> Cancelled: cancel_job()
Running --> Complete: task returns successfully
Running --> Failed: task raises exception
Failed --> Pending: retry (count < max_retries)\nwith exponential backoff
Failed --> Dead: retries exhausted\nmoved to DLQ
Dead --> Pending: retry_dead()
Complete --> [*]
Cancelled --> [*]
Dead --> [*]: purge_dead()`}
/>

## Status codes

| Status | Integer | Description |
|---|---|---|
| Pending | 0 | Waiting to be picked up |
| Running | 1 | Currently executing |
| Complete | 2 | Finished successfully |
| Failed | 3 | Last attempt failed (may retry) |
| Dead | 4 | All retries exhausted, in DLQ |
| Cancelled | 5 | Cancelled before execution |
11 changes: 10 additions & 1 deletion docs-next/content/docs/architecture/meta.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
{
"title": "Architecture",
"pages": ["overview"]
"pages": [
"overview",
"job-lifecycle",
"worker-pool",
"scheduler",
"storage",
"resources",
"failure-model",
"serialization"
]
}
46 changes: 41 additions & 5 deletions docs-next/content/docs/architecture/overview.mdx
Original file line number Diff line number Diff line change
@@ -1,10 +1,46 @@
---
title: Overview
description: "How taskito is built end to end."
description: "Hybrid Python/Rust architecture: Python API on top, Rust engine underneath."
---

import { Callout } from 'fumadocs-ui/components/callout';
taskito is a hybrid Python/Rust system. Python provides the user-facing API. Rust
handles all the heavy lifting: storage, scheduling, dispatch, rate limiting, and
worker management.

<Callout title="Phase 1 stub" type="info">
Content port pending. See the [Zensical source](https://github.com/ByteVeda/taskito/tree/master/docs) for current text.
</Callout>
<Mermaid
chart={`flowchart TD
subgraph py ["Python Layer"]
direction LR
Q["Queue"] --> IC["ArgumentInterceptor"]
TW["@queue.task()"] ~~~ RR["ResourceRuntime"]
end

subgraph rust ["Rust Core — PyO3"]
direction LR
PQ["PyQueue"] --> SCH["Scheduler"]
SCH --> WP["Worker Pool"]
SCH --> RL["Rate Limiter"]
end

subgraph storage ["Storage"]
direction LR
SQ[("SQLite")] ~~~ PG[("PostgreSQL")]
end

IC --> PQ
WP -->|"acquire GIL"| TW
SCH -->|"poll / update"| SQ
PQ -->|"INSERT"| SQ`}
/>

## Section overview

| Page | What it covers |
|---|---|
| [Job Lifecycle](/docs/architecture/job-lifecycle) | State machine, status codes, transitions |
| [Worker Pool](/docs/architecture/worker-pool) | Thread architecture, async dispatch, GIL management |
| [Storage Layer](/docs/architecture/storage) | SQLite pragmas, schema, indexes, Postgres differences |
| [Scheduler](/docs/architecture/scheduler) | Poll loop, dispatch flow, periodic tasks |
| [Resource System](/docs/architecture/resources) | Argument interception, DI, proxy reconstruction |
| [Failure Model](/docs/architecture/failure-model) | Crash recovery, duplicate execution, partial writes |
| [Serialization](/docs/architecture/serialization) | Pluggable serializers, format details |
44 changes: 44 additions & 0 deletions docs-next/content/docs/architecture/resources.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
---
title: Resource System
description: "The three-layer Python pipeline that intercepts arguments, injects DI, and reconstructs proxies."
---

The resource system is a three-layer Python pipeline that runs entirely outside Rust:

<Mermaid
chart={`flowchart TD
subgraph enqueue ["enqueue()"]
ARGS["Task arguments"] --> IC["ArgumentInterceptor"]
IC -->|PASS / CONVERT / REDIRECT| SER["Serializer"]
IC -->|PROXY| PX["ProxyHandler.deconstruct()"]
PX --> SER
end

SER -->|"serialized payload"| QUEUE[("Queue")]

subgraph worker ["Worker dispatch"]
DE["Deserialize"] --> RC["reconstruct_args()"]
RC --> FN["Task function"]
RT["ResourceRuntime"] -->|"inject"| FN
PX2["ProxyHandler.reconstruct()"] --> FN
end

QUEUE --> DE`}
/>

**Layer 1 — Argument Interception**: the `ArgumentInterceptor` walks every argument
before serialization, applying the strategy registered for its type. CONVERT types
are transformed to JSON-safe markers. REDIRECT types are replaced with a DI placeholder.
PROXY types are deconstructed by their handler. REJECT types raise an error in strict
mode.

**Layer 2 — Worker Resource Runtime**: `ResourceRuntime` initializes all registered
resources at worker startup in topological dependency order. At task dispatch time it
injects the requested resources (via `inject=` or `Inject["name"]` annotation) as
keyword arguments. Task-scoped resources are acquired from a semaphore pool and
returned after the task finishes.

**Layer 3 — Resource Proxies**: `ProxyHandler` implementations know how to deconstruct
live objects (file handles, HTTP sessions, cloud clients) into a JSON-serializable
recipe, and how to reconstruct them on the worker before the task function is called.
Recipes are optionally HMAC-signed for tamper detection.
32 changes: 32 additions & 0 deletions docs-next/content/docs/architecture/scheduler.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
title: Scheduler
description: "The Tokio-based poll loop that dequeues, dispatches, retries, and reaps."
---

The scheduler runs in a dedicated Tokio single-threaded async runtime:

```
loop {
sleep(50ms) or shutdown signal

// Try to dequeue and dispatch a job
try_dispatch()

// Every ~100 iterations (~5s): reap timed-out jobs
reap_stale()

// Every ~60 iterations (~3s): check periodic tasks
check_periodic()

// Every ~1200 iterations (~60s): auto-cleanup old jobs
auto_cleanup()
}
```

## Dispatch flow

1. `dequeue_from()` — atomically `SELECT` + `UPDATE` (pending → running) within a transaction.
2. Check rate limit — if over limit, reschedule 1s in the future.
3. Send job to worker pool via `tokio::sync::mpsc` channel.
4. Worker executes task, sends result back.
5. `handle_result()` — mark complete, schedule retry, or move to DLQ.
32 changes: 32 additions & 0 deletions docs-next/content/docs/architecture/serialization.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
title: Serialization
description: "Pluggable serializers for task arguments and results, with Cloudpickle as the default."
---

taskito uses a pluggable serializer for task arguments and results. The default is
`CloudpickleSerializer`, which supports lambdas, closures, and complex Python objects.

```python
from taskito import Queue, JsonSerializer

# Use JSON for simpler, cross-language payloads
queue = Queue(serializer=JsonSerializer())
```

## Built-in serializers

| Serializer | Format | Best for |
|---|---|---|
| `CloudpickleSerializer` (default) | Binary (pickle) | Complex Python objects, lambdas, closures |
| `JsonSerializer` | JSON | Simple types, cross-language interop, debugging |

## Custom serializers

Implement the `Serializer` protocol (`dumps(obj) -> bytes`, `loads(data) -> Any`).

## What gets serialized

- **Arguments**: `serializer.dumps((args, kwargs))` — stored as BLOB in `payload`
- **Results**: `serializer.dumps(return_value)` — stored as BLOB in `result`
- **Periodic task args**: serialized at registration time, stored as BLOBs in
`periodic_tasks.args`
Loading
Loading