Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
354 changes: 354 additions & 0 deletions docs-next/content/docs/api-reference/canvas.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,354 @@
---
title: Canvas
description: "Workflow primitives — Signature, chain, group, chord, chunks, starmap."
---

Canvas primitives for composing task workflows. Import directly from the package:

```python
from taskito import chain, group, chord, chunks, starmap
```

---

## Signature

A frozen task call spec — describes *what* to call and *with what arguments*,
without executing it.

### Creating signatures

```python
# Mutable signature — receives previous result in chains
sig = add.s(2, 3)

# Immutable signature — ignores previous result in chains
sig = add.si(2, 3)
```

### Fields

| Field | Type | Description |
|---|---|---|
| `task` | `TaskWrapper` | The task to call |
| `args` | `tuple` | Positional arguments |
| `kwargs` | `dict` | Keyword arguments |
| `options` | `dict` | Enqueue options (priority, queue, etc.) |
| `immutable` | `bool` | If `True`, ignores previous result in chains |

### `sig.apply()`

```python
sig.apply(queue: Queue | None = None) -> JobResult
```

Enqueue this signature immediately. If `queue` is `None`, uses the task's
parent queue.

```python
sig = add.s(2, 3)
job = sig.apply()
print(job.result(timeout=10)) # 5
```

### `await sig.apply_async()`

```python
await sig.apply_async(queue: Queue | None = None) -> JobResult
```

Async version of `apply()`. Safe to call from async contexts (FastAPI handlers,
etc.).

### Mutable vs immutable

In a [`chain`](#chain), the previous task's return value is **prepended** to a
mutable signature's args:

```python
# add.s(10) in a chain where previous step returned 5:
# → add(5, 10) = 15

# add.si(2, 3) in a chain:
# → add(2, 3) = 5 (always, regardless of previous result)
```

---

## chain

Execute signatures sequentially, piping each result to the next.

### Constructor

```python
chain(*signatures: Signature)
```

Requires at least one signature.

### `chain.apply()`

```python
chain.apply(queue: Queue | None = None) -> JobResult
```

Execute the chain by enqueuing and waiting for each step sequentially. Returns
the [`JobResult`](/docs/api-reference/result) of the **last** step.

Each step's return value is prepended to the next mutable signature's args.
Immutable signatures (`task.si()`) receive their args as-is.

```python
@queue.task()
def double(x):
return x * 2

@queue.task()
def add_ten(x):
return x + 10

# double(3) → 6, then add_ten(6) → 16
result = chain(double.s(3), add_ten.s()).apply()
print(result.result(timeout=10)) # 16
```

<Mermaid
chart={`graph LR
A["double(3)"] -->|"6"| B["add_ten(6)"]
B -->|"16"| C["Result: 16"]`}
/>

### `await chain.apply_async()`

```python
await chain.apply_async(queue: Queue | None = None) -> JobResult
```

Async version of `apply()`. Awaits each step's result using `aresult()`
instead of blocking with `result()`. Safe to call from async contexts.

```python
result = await chain(double.s(3), add_ten.s()).apply_async()
value = await result.aresult(timeout=10) # 16
```

---

## group

Execute signatures in parallel and collect all results.

### Constructor

```python
group(*signatures: Signature)
```

Requires at least one signature.

### `group.apply()`

```python
group.apply(queue: Queue | None = None) -> list[JobResult]
```

Enqueue all signatures and return a list of
[`JobResult`](/docs/api-reference/result) handles. Jobs run concurrently
across available workers.

```python
jobs = group(
add.s(1, 2),
add.s(3, 4),
add.s(5, 6),
).apply()

results = [j.result(timeout=10) for j in jobs]
print(results) # [3, 7, 11]
```

<Mermaid
chart={`graph LR
A["add(1,2)"] --> D["Results: [3, 7, 11]"]
B["add(3,4)"] --> D
C["add(5,6)"] --> D`}
/>

### `await group.apply_async()`

```python
await group.apply_async(queue: Queue | None = None) -> list[JobResult]
```

Async version of `apply()`. With `max_concurrency`, uses `asyncio.gather` to
await each wave concurrently instead of blocking.

---

## chord

Run a group in parallel, then pass all results to a callback.

### Constructor

```python
chord(group_: group, callback: Signature)
```

| Parameter | Type | Description |
|---|---|---|
| `group_` | `group` | The group of tasks to run in parallel |
| `callback` | `Signature` | The task to call with all collected results |

### `chord.apply()`

```python
chord.apply(queue: Queue | None = None) -> JobResult
```

Execute the group, wait for all results, then run the callback with the list
of results prepended to its args (unless immutable). Returns the
[`JobResult`](/docs/api-reference/result) of the callback.

```python
@queue.task()
def fetch(url):
return requests.get(url).text

@queue.task()
def merge(results):
return "\n".join(results)

result = chord(
group(fetch.s("https://a.com"), fetch.s("https://b.com")),
merge.s(),
).apply()

combined = result.result(timeout=30)
```

<Mermaid
chart={`graph LR
A["fetch(a.com)"] --> C["merge([...])"]
B["fetch(b.com)"] --> C
C --> D["Combined result"]`}
/>

### `await chord.apply_async()`

```python
await chord.apply_async(queue: Queue | None = None) -> JobResult
```

Async version of `apply()`. Awaits all group results using `asyncio.gather`,
then enqueues the callback.

---

## chunks

Split an iterable into fixed-size chunks and process each chunk in parallel.

### Constructor

```python
chunks(task: TaskWrapper, items: list, chunk_size: int) -> group
```

| Parameter | Type | Description |
|---|---|---|
| `task` | `TaskWrapper` | The task to call for each chunk. Receives the chunk as a single positional argument. |
| `items` | `list` | The full list to split. Must be non-empty. |
| `chunk_size` | `int` | Items per chunk. Must be positive. |

Returns a [`group`](#group) of signatures — one per chunk. Apply it the same
way as any other group.

```python
@queue.task()
def process_batch(records):
return [transform(r) for r in records]

records = load_records() # 10_000 items
jobs = chunks(process_batch, records, 100).apply() # → 100 parallel jobs

results = [j.result(timeout=60) for j in jobs]
```

Raises `ValueError` if `chunk_size <= 0` or `items` is empty.

---

## starmap

Spread an iterable of argument tuples over parallel task invocations.

### Constructor

```python
starmap(task: TaskWrapper, args_list: list[tuple]) -> group
```

| Parameter | Type | Description |
|---|---|---|
| `task` | `TaskWrapper` | The task to call. Each tuple is unpacked into positional args. |
| `args_list` | `list[tuple]` | One tuple per invocation. Must be non-empty. |

Returns a [`group`](#group) of signatures — one per tuple. Equivalent to
`group(task.s(*a) for a in args_list)`.

```python
@queue.task()
def add(x, y):
return x + y

jobs = starmap(add, [(1, 2), (3, 4), (5, 6)]).apply()
results = [j.result(timeout=10) for j in jobs] # [3, 7, 11]
```

Raises `ValueError` if `args_list` is empty.

---

## Complete example

An ETL pipeline using all three primitives:

```python
from taskito import Queue, chain, group, chord

queue = Queue()

@queue.task()
def extract(source):
return load_data(source)

@queue.task()
def transform(data):
return clean(data)

@queue.task()
def aggregate(results):
return merge_datasets(results)

@queue.task()
def load(data):
save_to_warehouse(data)

# Extract from 3 sources in parallel, transform each,
# aggregate all results, then load
pipeline = chain(
chord(
group(
chain(extract.s("db"), transform.s()),
chain(extract.s("api"), transform.s()),
chain(extract.s("csv"), transform.s()),
),
aggregate.s(),
),
load.s(),
)

result = pipeline.apply(queue)
```
Loading
Loading