Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/broker_redis/bench_chicory.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async def _run_batch(
workload_type: WorkloadType,
) -> BenchmarkResult:
task_func = _WORKLOAD_TASKS[workload_type]
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()

logger.info("enqueuing tasks...")
enqueue_start = loop.time()
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/broker_redis/bench_taskiq.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def _run_batch(
workload_type: WorkloadType,
) -> BenchmarkResult:
task_func = _WORKLOAD_TASKS[workload_type]
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()

# startup per batch — FLUSHDB between batches destroys consumer groups
await broker.startup()
Expand Down
14 changes: 14 additions & 0 deletions benchmarks/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ services:
networks:
- chicory

redis-insight:
image: redis/redisinsight:latest
stop_grace_period: 0s
ports:
- 5540:5540
depends_on:
redis:
condition: service_healthy
environment:
- RI_REDIS_HOST=redis
- RI_REDIS_PORT=6379
networks:
- chicory

prometheus:
image: prom/prometheus:v3.9.1
ports:
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/framework/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async def run_workload(
enqueue_func: Any,
result_func: Any,
) -> BenchmarkResult:
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()

logger.info(f"Running workload {workload_type.value} with {task_count} tasks")

Expand Down
152 changes: 152 additions & 0 deletions docs/user-guide/benchmarks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Benchmarks

Chicory ships an internal benchmark suite in the `benchmarks/` directory that provides
an apple-to-apple comparison of **Chicory** vs **TaskIQ**, both using Redis as broker
and result backend.

## Prerequisites

- Docker & Docker Compose
- Python 3.11+ with [uv](https://docs.astral.sh/uv/)

The benchmark workspace is
a [uv workspace member](https://docs.astral.sh/uv/concepts/workspaces/)
with its own `pyproject.toml`. All commands should be run from the `benchmarks/`
directory (or via `benchmarks/Makefile`).

## Quick Start

```bash
# From the repository root:
cd benchmarks

# 1. Install benchmark dependencies
uv sync --dev --all-extras

# 2. Start Redis, Prometheus, and Grafana
make up

# 3. Terminal 1 — start a worker (pick one)
make chicory-worker # 4 processes × 32 concurrency = 128 concurrent
make taskiq-worker # 4 processes × 32 max-async-tasks = 128 concurrent

# 4. Terminal 2 — run benchmarks
make chicory-run-all # all workloads
make taskiq-run-all

# 5. View results
# Console — printed at end of each run
# Grafana — http://localhost:3000 (dashboard: "Chicory Benchmarks")
# Prometheus — http://localhost:9100
```

## What's Measured

Each benchmark iterates over batch sizes
`[8, 16, 32, 64, 128, 256, 1024, 2048, 4096, 8192, 16384]` and records:

| Metric | Description |
|---------------------------------|-------------------------------------------------------|
| **Enqueue duration** | Wall time to `gather()` all `delay()` / `kiq()` calls |
| **Dequeue duration** | Wall time to `gather()` all result retrievals |
| **Throughput** | `batch_size / (enqueue + dequeue)` |
| **Success / Failure / Invalid** | Per-batch result classification |

Redis is flushed (both db 0 and db 1) between batches to prevent data leakage.

## Workload Types

| Workload | Task body | Purpose |
|-------------|-------------------------------------------|------------------------------------------|
| `increment` | `return value + 1` | Baseline — pure enqueue/dequeue overhead |
| `cpu_bound` | 10k iterations of `(x * 31 + 17) % 1e9+7` | CPU-bound worker load |
| `io_bound` | `asyncio.sleep(0.01)` | I/O-bound / concurrency pressure |

Run individual workloads:

```bash
make chicory-run-increment
make chicory-run-cpu
make chicory-run-io
make chicory-run-all

# Same targets exist for taskiq:
make taskiq-run-all
```

## Fairness Controls

Both frameworks try as much as possible to run under equivalent conditions:

| Setting | Chicory | TaskIQ |
|------------------------|-----------------------|-----------------------|
| Workers (processes) | 4 | 4 |
| Concurrency per worker | 32 | 32 |
| Broker | Redis Streams (db 0) | Redis Streams (db 0) |
| Result backend | Redis (db 1) | Redis (db 1) |
| Validation overhead | `ValidationMode.NONE` | N/A (none by default) |
| Connection pool | Default | Default |

## Reading the Results

### Console output

At the end of each run, a summary table is printed to the console showing
per-batch-size rows with enqueue duration, dequeue duration, throughput
(tasks/sec), and success/failure/invalid counts.

**What to look for:**

- **Throughput scaling**: throughput should increase with batch size up to a
plateau. If it drops, the bottleneck is usually Redis or result polling.
- **Enqueue vs dequeue split**: enqueue is typically fast; if dequeue dominates,
the result backend or polling interval may be the bottleneck.
- **Failures / invalid**: should be zero. Non-zero values indicate bugs or
resource exhaustion.

### Grafana dashboard

The auto-provisioned Grafana dashboard (at `http://localhost:3000`) has 9 panels:

1. Enqueue duration per batch
2. Dequeue duration per batch
3. Throughput (tasks/sec)
4. Success count
5. Failure count
6. Invalid count
7. Redis memory usage
8. Redis ops/sec
9. Redis connected clients

Use the template variables at the top to filter by **target** (`chicory`,
`taskiq`, or both) and **workload_type** (`increment`, `cpu_bound`, `io_bound`, or all).

## Monitoring Stack

| Service | Internal port | External port |
|----------------|---------------|---------------|
| Redis | 6379 | 6379 |
| Redis Exporter | 9121 | — |
| Prometheus | 9090 | **9100** |
| Grafana | 3000 | 3000 |

Prometheus is mapped to **9100** externally to avoid conflict with Chicory's
metrics endpoint on 9090.

## Cleanup

```bash
make down # Stop all containers and remove volumes
```

## Adding New Benchmarks

1. Create `benchmarks/broker_redis/bench_<name>.py` following the pattern in
existing bench files.
2. Define tasks, `_WORKLOAD_TASKS` mapping, `_flush_redis()`, `_run_batch()`.
3. Use `MetricsCollector.record_result(result, "<name>")` for Prometheus export.
4. Add `make <name>-worker` and `make <name>-run-*` targets to the Makefile.
5. Add a Prometheus scrape target in `monitor/prometheus.yml`.
6. Update the Grafana dashboard `target` template variable.

For full implementation details, see `benchmarks/README.md` in the repository.
27 changes: 14 additions & 13 deletions docs/user-guide/brokers/index.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Message Brokers

Message brokers are the backbone of Chicory's task distribution system. They queue task messages and deliver them to workers for execution.
Message brokers are the backbone of Chicory's task distribution system.
They queue task messages and deliver them to workers for execution.

## Overview

Expand Down Expand Up @@ -74,17 +75,17 @@ Chicory supports two production-ready message brokers:

## Feature Comparison

| Feature | Redis Streams | RabbitMQ |
|---------|--------------|----------|
| **Setup Complexity** | Simple | Moderate |
| **Priority Queues** | No | Yes |
| **Message Routing** | Basic | Advanced |
| **Dead Letter Queue** | Manual | Native |
| **Delayed Tasks** | Sorted Set | TTL + DLX |
| **Monitoring** | Redis CLI | Management UI |
| **Clustering** | Redis Cluster | Native |
| **Persistence** | RDB/AOF | Built-in |
| **Memory Usage** | Low | Moderate |
| Feature | Redis Streams | RabbitMQ |
|-----------------------|---------------|---------------|
| **Setup Complexity** | Simple | Moderate |
| **Priority Queues** | No | Yes |
| **Message Routing** | Basic | Advanced |
| **Dead Letter Queue** | Manual | Native |
| **Delayed Tasks** | Sorted Set | TTL + DLX |
| **Monitoring** | Redis CLI | Management UI |
| **Clustering** | Redis Cluster | Native |
| **Persistence** | RDB/AOF | Built-in |
| **Memory Usage** | Low | Moderate |

## Basic Usage

Expand Down Expand Up @@ -192,7 +193,7 @@ Both brokers support delayed task execution:
from datetime import datetime, timedelta, UTC

# Schedule for specific time
eta = datetime(2024, 12, 25, 9, 0, 0)
eta = datetime(2024, 12, 25, 9, 0, 0, tzinfo=UTC)
message = TaskMessage(..., eta=eta)
await app.broker.publish(message)

Expand Down
85 changes: 64 additions & 21 deletions docs/user-guide/brokers/redis.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Redis Broker

The Redis broker uses [Redis Streams](https://redis.io/docs/data-types/streams/) for task distribution. It's lightweight, easy to set up, and provides excellent performance for most use cases.
The Redis broker uses [Redis Streams](https://redis.io/docs/data-types/streams/) for task distribution.
It's lightweight, easy to set up, and provides excellent performance for most use cases.

## Installation

Expand Down Expand Up @@ -111,6 +112,35 @@ CHICORY_BROKER_REDIS_BLOCK_MS=5000
CHICORY_BROKER_REDIS_CLAIM_MIN_IDLE_MS=30000
```

### Delayed-Task Mover Settings

These settings control the background loop that promotes delayed tasks from the sorted set
to the main stream. See [Delayed Tasks / How It Works](#how-it-works-1) for details.

```python
config = RedisBrokerConfig(
# Minimum sleep between mover iterations (milliseconds).
# Used after a batch of tasks is promoted or as the lower
# bound when the next ETA is very close.
mover_min_sleep_ms=50, # default 50 ms

# Maximum sleep between mover iterations (milliseconds).
# Used when the delayed set is empty or the next ETA is
# far in the future.
mover_max_sleep_ms=1000, # default 1000 ms (1 second)
)
```

**Environment variables:**
```bash
CHICORY_BROKER_REDIS_MOVER_MIN_SLEEP_MS=50
CHICORY_BROKER_REDIS_MOVER_MAX_SLEEP_MS=1000
```

!!! tip "Tuning guidance"
- **Lower `mover_min_sleep_ms`** (e.g. 10) for sub-100ms ETA precision at the cost of slightly more Redis traffic.
- **Raise `mover_max_sleep_ms`** (e.g. 5000) to reduce idle polling when delayed tasks are infrequent.

### Stream Management

```python
Expand Down Expand Up @@ -299,36 +329,49 @@ eta = datetime.now(UTC) + timedelta(hours=1)

### How It Works

Delayed tasks use a Redis sorted set:
Delayed tasks use a Redis sorted set with the ETA timestamp as the score and the
pickle-serialized task bytes as the value:

```
Sorted Set: chicory:delayed:default
Score (timestamp) | Value (task JSON)
1640433600.0 | {"id": "abc", "name": "task1", ...}
1640437200.0 | {"id": "def", "name": "task2", ...}
Score (timestamp) | Value (binary task data)
1640433600.0 | <pickle bytes for task "abc">
1640437200.0 | <pickle bytes for task "def">
```

Workers periodically check for ready tasks:
A **background mover loop** runs inside each worker and periodically promotes
ready tasks to the main stream. Promotion is **atomic**: a server-side Lua
script executes `ZRANGEBYSCORE`, `XADD`, and `ZREM` in a single `EVAL` call,
so no two workers can promote the same task:

```python
# Every consumption cycle
now = time.time()
ready_tasks = ZRANGEBYSCORE chicory:delayed:default 0 {now}

# Move to main stream
for task in ready_tasks:
XADD chicory:stream:default {task}
ZREM chicory:delayed:default {task}
```
EVAL lua_move_delayed 2 chicory:delayed:default chicory:stream:default <now>
```

The mover loop uses **adaptive polling** to balance responsiveness and
efficiency:

1. **Tasks just moved**: re-check quickly (`mover_min_sleep_ms`) in case
more messages are ready in a burst.
2. **Delayed tasks pending**: sleep until the earliest one matures, clamped
to `[mover_min_sleep_ms, mover_max_sleep_ms]`.
3. **No delayed tasks**: sleep for `mover_max_sleep_ms` to minimize idle
Redis traffic.

All exceptions except `CancelledError` are caught, so transient Redis errors
(pool exhaustion, network blips) cannot kill the loop; it logs the error and
retries after `mover_max_sleep_ms`.

See [Delayed-Task Mover Settings](#delayed-task-mover-settings) for tuning.

## Comparison with Celery

| Feature | Chicory | Celery |
|---------|---------|--------|
| **Stream API** | XREADGROUP | Custom Lua |
| **Consumer Groups** | Native | Custom |
| **Delayed Tasks** | Sorted Set | Sorted Set |
| **DLQ** | Stream-based | Custom |
| Feature | Chicory | Celery |
|---------------------|--------------|------------|
| **Stream API** | XREADGROUP | Custom Lua |
| **Consumer Groups** | Native | Custom |
| **Delayed Tasks** | Sorted Set | Sorted Set |
| **DLQ** | Stream-based | Custom |

Chicory uses native Redis Streams features for simpler, more maintainable code.

Expand Down
Loading
Loading