diff --git a/benchmarks/broker_redis/bench_chicory.py b/benchmarks/broker_redis/bench_chicory.py index a6d3909..ed09b1f 100644 --- a/benchmarks/broker_redis/bench_chicory.py +++ b/benchmarks/broker_redis/bench_chicory.py @@ -68,7 +68,7 @@ async def _run_batch( workload_type: WorkloadType, ) -> BenchmarkResult: task_func = _WORKLOAD_TASKS[workload_type] - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() logger.info("enqueuing tasks...") enqueue_start = loop.time() diff --git a/benchmarks/broker_redis/bench_taskiq.py b/benchmarks/broker_redis/bench_taskiq.py index 0044247..0b19a80 100644 --- a/benchmarks/broker_redis/bench_taskiq.py +++ b/benchmarks/broker_redis/bench_taskiq.py @@ -71,7 +71,7 @@ async def _run_batch( workload_type: WorkloadType, ) -> BenchmarkResult: task_func = _WORKLOAD_TASKS[workload_type] - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() # startup per batch — FLUSHDB between batches destroys consumer groups await broker.startup() diff --git a/benchmarks/docker-compose.yml b/benchmarks/docker-compose.yml index 329841c..23e96d8 100644 --- a/benchmarks/docker-compose.yml +++ b/benchmarks/docker-compose.yml @@ -23,6 +23,20 @@ services: networks: - chicory + redis-insight: + image: redis/redisinsight:latest + stop_grace_period: 0s + ports: + - 5540:5540 + depends_on: + redis: + condition: service_healthy + environment: + - RI_REDIS_HOST=redis + - RI_REDIS_PORT=6379 + networks: + - chicory + prometheus: image: prom/prometheus:v3.9.1 ports: diff --git a/benchmarks/framework/runner.py b/benchmarks/framework/runner.py index 940b0d3..e068cfd 100644 --- a/benchmarks/framework/runner.py +++ b/benchmarks/framework/runner.py @@ -54,7 +54,7 @@ async def run_workload( enqueue_func: Any, result_func: Any, ) -> BenchmarkResult: - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() logger.info(f"Running workload {workload_type.value} with {task_count} tasks") diff --git a/docs/user-guide/benchmarks.md b/docs/user-guide/benchmarks.md new file mode 100644 index 0000000..aa54285 --- /dev/null +++ b/docs/user-guide/benchmarks.md @@ -0,0 +1,152 @@ +# Benchmarks + +Chicory ships an internal benchmark suite in the `benchmarks/` directory that provides +an apple-to-apple comparison of **Chicory** vs **TaskIQ**, both using Redis as broker +and result backend. + +## Prerequisites + +- Docker & Docker Compose +- Python 3.11+ with [uv](https://docs.astral.sh/uv/) + +The benchmark workspace is +a [uv workspace member](https://docs.astral.sh/uv/concepts/workspaces/) +with its own `pyproject.toml`. All commands should be run from the `benchmarks/` +directory (or via `benchmarks/Makefile`). + +## Quick Start + +```bash +# From the repository root: +cd benchmarks + +# 1. Install benchmark dependencies +uv sync --dev --all-extras + +# 2. Start Redis, Prometheus, and Grafana +make up + +# 3. Terminal 1 — start a worker (pick one) +make chicory-worker # 4 processes × 32 concurrency = 128 concurrent +make taskiq-worker # 4 processes × 32 max-async-tasks = 128 concurrent + +# 4. Terminal 2 — run benchmarks +make chicory-run-all # all workloads +make taskiq-run-all + +# 5. View results +# Console — printed at end of each run +# Grafana — http://localhost:3000 (dashboard: "Chicory Benchmarks") +# Prometheus — http://localhost:9100 +``` + +## What's Measured + +Each benchmark iterates over batch sizes +`[8, 16, 32, 64, 128, 256, 1024, 2048, 4096, 8192, 16384]` and records: + +| Metric | Description | +|---------------------------------|-------------------------------------------------------| +| **Enqueue duration** | Wall time to `gather()` all `delay()` / `kiq()` calls | +| **Dequeue duration** | Wall time to `gather()` all result retrievals | +| **Throughput** | `batch_size / (enqueue + dequeue)` | +| **Success / Failure / Invalid** | Per-batch result classification | + +Redis is flushed (both db 0 and db 1) between batches to prevent data leakage. + +## Workload Types + +| Workload | Task body | Purpose | +|-------------|-------------------------------------------|------------------------------------------| +| `increment` | `return value + 1` | Baseline — pure enqueue/dequeue overhead | +| `cpu_bound` | 10k iterations of `(x * 31 + 17) % 1e9+7` | CPU-bound worker load | +| `io_bound` | `asyncio.sleep(0.01)` | I/O-bound / concurrency pressure | + +Run individual workloads: + +```bash +make chicory-run-increment +make chicory-run-cpu +make chicory-run-io +make chicory-run-all + +# Same targets exist for taskiq: +make taskiq-run-all +``` + +## Fairness Controls + +Both frameworks try as much as possible to run under equivalent conditions: + +| Setting | Chicory | TaskIQ | +|------------------------|-----------------------|-----------------------| +| Workers (processes) | 4 | 4 | +| Concurrency per worker | 32 | 32 | +| Broker | Redis Streams (db 0) | Redis Streams (db 0) | +| Result backend | Redis (db 1) | Redis (db 1) | +| Validation overhead | `ValidationMode.NONE` | N/A (none by default) | +| Connection pool | Default | Default | + +## Reading the Results + +### Console output + +At the end of each run, a summary table is printed to the console showing +per-batch-size rows with enqueue duration, dequeue duration, throughput +(tasks/sec), and success/failure/invalid counts. + +**What to look for:** + +- **Throughput scaling**: throughput should increase with batch size up to a + plateau. If it drops, the bottleneck is usually Redis or result polling. +- **Enqueue vs dequeue split**: enqueue is typically fast; if dequeue dominates, + the result backend or polling interval may be the bottleneck. +- **Failures / invalid**: should be zero. Non-zero values indicate bugs or + resource exhaustion. + +### Grafana dashboard + +The auto-provisioned Grafana dashboard (at `http://localhost:3000`) has 9 panels: + +1. Enqueue duration per batch +2. Dequeue duration per batch +3. Throughput (tasks/sec) +4. Success count +5. Failure count +6. Invalid count +7. Redis memory usage +8. Redis ops/sec +9. Redis connected clients + +Use the template variables at the top to filter by **target** (`chicory`, +`taskiq`, or both) and **workload_type** (`increment`, `cpu_bound`, `io_bound`, or all). + +## Monitoring Stack + +| Service | Internal port | External port | +|----------------|---------------|---------------| +| Redis | 6379 | 6379 | +| Redis Exporter | 9121 | — | +| Prometheus | 9090 | **9100** | +| Grafana | 3000 | 3000 | + +Prometheus is mapped to **9100** externally to avoid conflict with Chicory's +metrics endpoint on 9090. + +## Cleanup + +```bash +make down # Stop all containers and remove volumes +``` + +## Adding New Benchmarks + +1. Create `benchmarks/broker_redis/bench_.py` following the pattern in + existing bench files. +2. Define tasks, `_WORKLOAD_TASKS` mapping, `_flush_redis()`, `_run_batch()`. +3. Use `MetricsCollector.record_result(result, "")` for Prometheus export. +4. Add `make -worker` and `make -run-*` targets to the Makefile. +5. Add a Prometheus scrape target in `monitor/prometheus.yml`. +6. Update the Grafana dashboard `target` template variable. + +For full implementation details, see `benchmarks/README.md` in the repository. diff --git a/docs/user-guide/brokers/index.md b/docs/user-guide/brokers/index.md index 9b04f17..e3fb4ea 100644 --- a/docs/user-guide/brokers/index.md +++ b/docs/user-guide/brokers/index.md @@ -1,6 +1,7 @@ # Message Brokers -Message brokers are the backbone of Chicory's task distribution system. They queue task messages and deliver them to workers for execution. +Message brokers are the backbone of Chicory's task distribution system. +They queue task messages and deliver them to workers for execution. ## Overview @@ -74,17 +75,17 @@ Chicory supports two production-ready message brokers: ## Feature Comparison -| Feature | Redis Streams | RabbitMQ | -|---------|--------------|----------| -| **Setup Complexity** | Simple | Moderate | -| **Priority Queues** | No | Yes | -| **Message Routing** | Basic | Advanced | -| **Dead Letter Queue** | Manual | Native | -| **Delayed Tasks** | Sorted Set | TTL + DLX | -| **Monitoring** | Redis CLI | Management UI | -| **Clustering** | Redis Cluster | Native | -| **Persistence** | RDB/AOF | Built-in | -| **Memory Usage** | Low | Moderate | +| Feature | Redis Streams | RabbitMQ | +|-----------------------|---------------|---------------| +| **Setup Complexity** | Simple | Moderate | +| **Priority Queues** | No | Yes | +| **Message Routing** | Basic | Advanced | +| **Dead Letter Queue** | Manual | Native | +| **Delayed Tasks** | Sorted Set | TTL + DLX | +| **Monitoring** | Redis CLI | Management UI | +| **Clustering** | Redis Cluster | Native | +| **Persistence** | RDB/AOF | Built-in | +| **Memory Usage** | Low | Moderate | ## Basic Usage @@ -192,7 +193,7 @@ Both brokers support delayed task execution: from datetime import datetime, timedelta, UTC # Schedule for specific time -eta = datetime(2024, 12, 25, 9, 0, 0) +eta = datetime(2024, 12, 25, 9, 0, 0, tzinfo=UTC) message = TaskMessage(..., eta=eta) await app.broker.publish(message) diff --git a/docs/user-guide/brokers/redis.md b/docs/user-guide/brokers/redis.md index 6068d6c..1bda770 100644 --- a/docs/user-guide/brokers/redis.md +++ b/docs/user-guide/brokers/redis.md @@ -1,6 +1,7 @@ # Redis Broker -The Redis broker uses [Redis Streams](https://redis.io/docs/data-types/streams/) for task distribution. It's lightweight, easy to set up, and provides excellent performance for most use cases. +The Redis broker uses [Redis Streams](https://redis.io/docs/data-types/streams/) for task distribution. +It's lightweight, easy to set up, and provides excellent performance for most use cases. ## Installation @@ -111,6 +112,35 @@ CHICORY_BROKER_REDIS_BLOCK_MS=5000 CHICORY_BROKER_REDIS_CLAIM_MIN_IDLE_MS=30000 ``` +### Delayed-Task Mover Settings + +These settings control the background loop that promotes delayed tasks from the sorted set +to the main stream. See [Delayed Tasks / How It Works](#how-it-works-1) for details. + +```python +config = RedisBrokerConfig( + # Minimum sleep between mover iterations (milliseconds). + # Used after a batch of tasks is promoted or as the lower + # bound when the next ETA is very close. + mover_min_sleep_ms=50, # default 50 ms + + # Maximum sleep between mover iterations (milliseconds). + # Used when the delayed set is empty or the next ETA is + # far in the future. + mover_max_sleep_ms=1000, # default 1000 ms (1 second) +) +``` + +**Environment variables:** +```bash +CHICORY_BROKER_REDIS_MOVER_MIN_SLEEP_MS=50 +CHICORY_BROKER_REDIS_MOVER_MAX_SLEEP_MS=1000 +``` + +!!! tip "Tuning guidance" + - **Lower `mover_min_sleep_ms`** (e.g. 10) for sub-100ms ETA precision at the cost of slightly more Redis traffic. + - **Raise `mover_max_sleep_ms`** (e.g. 5000) to reduce idle polling when delayed tasks are infrequent. + ### Stream Management ```python @@ -299,36 +329,49 @@ eta = datetime.now(UTC) + timedelta(hours=1) ### How It Works -Delayed tasks use a Redis sorted set: +Delayed tasks use a Redis sorted set with the ETA timestamp as the score and the +pickle-serialized task bytes as the value: ``` Sorted Set: chicory:delayed:default -Score (timestamp) | Value (task JSON) -1640433600.0 | {"id": "abc", "name": "task1", ...} -1640437200.0 | {"id": "def", "name": "task2", ...} +Score (timestamp) | Value (binary task data) +1640433600.0 | +1640437200.0 | ``` -Workers periodically check for ready tasks: +A **background mover loop** runs inside each worker and periodically promotes +ready tasks to the main stream. Promotion is **atomic**: a server-side Lua +script executes `ZRANGEBYSCORE`, `XADD`, and `ZREM` in a single `EVAL` call, +so no two workers can promote the same task: -```python -# Every consumption cycle -now = time.time() -ready_tasks = ZRANGEBYSCORE chicory:delayed:default 0 {now} - -# Move to main stream -for task in ready_tasks: - XADD chicory:stream:default {task} - ZREM chicory:delayed:default {task} ``` +EVAL lua_move_delayed 2 chicory:delayed:default chicory:stream:default +``` + +The mover loop uses **adaptive polling** to balance responsiveness and +efficiency: + +1. **Tasks just moved**: re-check quickly (`mover_min_sleep_ms`) in case + more messages are ready in a burst. +2. **Delayed tasks pending**: sleep until the earliest one matures, clamped + to `[mover_min_sleep_ms, mover_max_sleep_ms]`. +3. **No delayed tasks**: sleep for `mover_max_sleep_ms` to minimize idle + Redis traffic. + +All exceptions except `CancelledError` are caught, so transient Redis errors +(pool exhaustion, network blips) cannot kill the loop; it logs the error and +retries after `mover_max_sleep_ms`. + +See [Delayed-Task Mover Settings](#delayed-task-mover-settings) for tuning. ## Comparison with Celery -| Feature | Chicory | Celery | -|---------|---------|--------| -| **Stream API** | XREADGROUP | Custom Lua | -| **Consumer Groups** | Native | Custom | -| **Delayed Tasks** | Sorted Set | Sorted Set | -| **DLQ** | Stream-based | Custom | +| Feature | Chicory | Celery | +|---------------------|--------------|------------| +| **Stream API** | XREADGROUP | Custom Lua | +| **Consumer Groups** | Native | Custom | +| **Delayed Tasks** | Sorted Set | Sorted Set | +| **DLQ** | Stream-based | Custom | Chicory uses native Redis Streams features for simpler, more maintainable code. diff --git a/docs/user-guide/configuration.md b/docs/user-guide/configuration.md index 08b98fb..d51c991 100644 --- a/docs/user-guide/configuration.md +++ b/docs/user-guide/configuration.md @@ -1,6 +1,7 @@ # Configuration -Chicory provides a flexible configuration system using Pydantic Settings with support for environment variables, .env files, and programmatic configuration. +Chicory provides a flexible configuration system using Pydantic Settings with support for +environment variables, .env files, and programmatic configuration. ## Configuration Priority @@ -173,6 +174,10 @@ config = RedisBrokerConfig( block_ms=5000, claim_min_idle_ms=30000, + # Delayed-task mover loop + mover_min_sleep_ms=50, # Minimum polling interval (ms) + mover_max_sleep_ms=1000, # Maximum polling interval (ms) + # Stream management max_stream_length=100000, dlq_max_length=10000, @@ -189,6 +194,8 @@ CHICORY_BROKER_REDIS_PASSWORD=secret CHICORY_BROKER_REDIS_CONSUMER_GROUP=chicory-workers CHICORY_BROKER_REDIS_BLOCK_MS=5000 CHICORY_BROKER_REDIS_CLAIM_MIN_IDLE_MS=30000 +CHICORY_BROKER_REDIS_MOVER_MIN_SLEEP_MS=50 +CHICORY_BROKER_REDIS_MOVER_MAX_SLEEP_MS=1000 CHICORY_BROKER_REDIS_MAX_STREAM_LENGTH=100000 CHICORY_BROKER_REDIS_DLQ_MAX_LENGTH=10000 CHICORY_BROKER_REDIS_KEY_PREFIX=chicory @@ -325,7 +332,8 @@ CHICORY_BACKEND_POSTGRES_POOL_TIMEOUT=30 ### Other Backends !!! warning "Experimental" - MySQL, SQLite, and MSSQL backends are **experimental and not tested**. They may work but are not guaranteed to be stable. + MySQL, SQLite, and MSSQL backends are **experimental and not tested**. + They may work but are not guaranteed to be stable. MySQL, SQLite, and MSSQL backends have similar configuration options to PostgreSQL. diff --git a/mkdocs.yml b/mkdocs.yml index 889c7c3..ef088e8 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -163,6 +163,7 @@ nav: - Backends: - user-guide/backends/index.md - Configuration: user-guide/configuration.md + - Benchmarks: user-guide/benchmarks.md - API Reference: - api/index.md - Chicory: api/chicory.md diff --git a/src/chicory/backend/redis.py b/src/chicory/backend/redis.py index c995342..f66e6b1 100644 --- a/src/chicory/backend/redis.py +++ b/src/chicory/backend/redis.py @@ -132,17 +132,23 @@ async def get_active_workers(self) -> list[WorkerStats]: worker_ids: set[bytes] = await self._client.smembers(self._workers_set_key()) # ty:ignore[invalid-await] - active_workers = [] - for worker_id_bytes in worker_ids: - worker_id = ( - worker_id_bytes.decode() - if isinstance(worker_id_bytes, bytes) - else worker_id_bytes - ) + # Use pipeline to batch all heartbeat lookups (avoid N+1 queries) + decoded_ids = [ + wid.decode() if isinstance(wid, bytes) else wid for wid in worker_ids + ] + + if not decoded_ids: + return [] - heartbeat = await self.get_heartbeat(worker_id) - if heartbeat: - active_workers.append(heartbeat) + async with self._client.pipeline(transaction=False) as pipe: + for worker_id in decoded_ids: + pipe.get(self._heartbeat_key(worker_id)) + results = await pipe.execute() + + active_workers = [] + for data in results: + if data: + active_workers.append(WorkerStats.model_validate_json(data)) return active_workers diff --git a/src/chicory/broker/rabbitmq.py b/src/chicory/broker/rabbitmq.py index ff165ee..ac02830 100644 --- a/src/chicory/broker/rabbitmq.py +++ b/src/chicory/broker/rabbitmq.py @@ -260,7 +260,7 @@ async def _publish_delayed( if not message.eta: return - delay_seconds = (message.eta - datetime.now(UTC)).total_seconds() + delay_seconds = max(0, (message.eta - datetime.now(UTC)).total_seconds()) await self._declare_delayed_infrastructure(channel, queue) @@ -268,7 +268,7 @@ async def _publish_delayed( await channel.default_exchange.publish( aio_pika.Message( body=data, - content_type="application/json", + content_type="application/octet-stream", delivery_mode=aio_pika.DeliveryMode.PERSISTENT if self.durable_queues else aio_pika.DeliveryMode.NOT_PERSISTENT, @@ -373,7 +373,7 @@ async def publish(self, message: TaskMessage, queue: str = DEFAULT_QUEUE) -> Non await channel.default_exchange.publish( aio_pika.Message( body=data, - content_type="application/json", + content_type="application/octet-stream", delivery_mode=aio_pika.DeliveryMode.PERSISTENT if self.durable_queues else aio_pika.DeliveryMode.NOT_PERSISTENT, @@ -447,23 +447,21 @@ async def consume(self, queue: str = DEFAULT_QUEUE) -> AsyncGenerator[TaskEnvelo async def ack(self, envelope: TaskEnvelope, queue: str = DEFAULT_QUEUE) -> None: """Acknowledge successful processing of a message.""" - if self.delivery_mode == DeliveryMode.AT_LEAST_ONCE: - raw_message = envelope._raw_message - if not isinstance(raw_message, AbstractIncomingMessage): - raise RuntimeError("Invalid raw message for acknowledgment") - if raw_message and not raw_message.processed: - await raw_message.ack() + raw_message = envelope._raw_message + if not isinstance(raw_message, AbstractIncomingMessage): + raise RuntimeError("Invalid raw message for acknowledgment") + if not raw_message.processed: + await raw_message.ack() async def nack( self, envelope: TaskEnvelope, requeue: bool = True, queue: str = DEFAULT_QUEUE ) -> None: """Negatively acknowledge a message, optionally requeuing it.""" - if self.delivery_mode == DeliveryMode.AT_LEAST_ONCE: - raw_message = envelope._raw_message - if not isinstance(raw_message, AbstractIncomingMessage): - raise RuntimeError("Invalid raw message for negative acknowledgment") - if raw_message and not raw_message.processed: - await raw_message.nack(requeue=requeue) + raw_message = envelope._raw_message + if not isinstance(raw_message, AbstractIncomingMessage): + raise RuntimeError("Invalid raw message for negative acknowledgment") + if not raw_message.processed: + await raw_message.nack(requeue=requeue) def stop(self) -> None: """Stop consuming messages.""" @@ -521,7 +519,7 @@ async def _manual_move_to_dlq( await channel.default_exchange.publish( aio_pika.Message( body=DLQData.dumps(dlq_data), - content_type="application/json", + content_type="application/octet-stream", delivery_mode=aio_pika.DeliveryMode.PERSISTENT if self.durable_queues else aio_pika.DeliveryMode.NOT_PERSISTENT, diff --git a/src/chicory/broker/redis.py b/src/chicory/broker/redis.py index b629c43..ca9ac90 100644 --- a/src/chicory/broker/redis.py +++ b/src/chicory/broker/redis.py @@ -1,6 +1,8 @@ from __future__ import annotations import asyncio +import contextlib +import logging import uuid from datetime import UTC, datetime from typing import TYPE_CHECKING, Any, cast @@ -15,6 +17,26 @@ if TYPE_CHECKING: from collections.abc import AsyncGenerator, Awaitable +_logger = logging.getLogger("chicory.broker.redis") + +# Lua script: atomically move delayed tasks whose score <= now from the +# sorted set (KEYS[1]) to the stream (KEYS[2]). Returns the number of +# tasks moved. Because the ZRANGEBYSCORE + ZREM happens inside a single +# EVAL, no two workers can promote the same task. +_LUA_MOVE_DELAYED = """ +local delayed_key = KEYS[1] +local stream_key = KEYS[2] +local now = ARGV[1] +local ready = redis.call('ZRANGEBYSCORE', delayed_key, 0, now) +local moved = 0 +for _, data in ipairs(ready) do + redis.call('XADD', stream_key, '*', 'data', data) + redis.call('ZREM', delayed_key, data) + moved = moved + 1 +end +return moved +""" + class RedisBroker(Broker): """ @@ -34,6 +56,9 @@ def __init__( self.max_stream_length = config.max_stream_length self.dlq_max_length = config.dlq_max_length + self.mover_min_sleep_secs = config.mover_min_sleep_ms / 1000 + self.mover_max_sleep_secs = config.mover_max_sleep_ms / 1000 + self.delivery_mode = delivery_mode self.consumer_name = consumer_name or f"worker-{uuid.uuid4().hex[:8]}" @@ -111,24 +136,89 @@ async def publish(self, message: TaskMessage, queue: str = DEFAULT_QUEUE) -> Non await self._client.xadd(stream_key, **kwargs) - async def _move_ready_delayed(self, queue: str) -> None: - """Move delayed tasks that are ready to the stream.""" + async def _move_ready_delayed(self, queue: str) -> int: + """ + Atomically move delayed tasks that are ready to the stream. + + Uses a Lua script so that the read-from-sorted-set and write-to-stream + happen in a single Redis operation. This prevents multiple workers from + promoting the same task concurrently (the PERF-1 race condition). + + Returns: + The number of tasks that were moved. + """ if not self._client: - return + return 0 now = datetime.now(UTC).timestamp() delayed_key = self._delayed_key(queue) stream_key = self._stream_key(queue) - # Get all tasks with score <= now - ready = await self._client.zrangebyscore(delayed_key, 0, now) + moved: int = await self._client.eval( # ty: ignore[invalid-await] + _LUA_MOVE_DELAYED, 2, delayed_key, stream_key, now + ) + return moved - if ready: - pipe = self._client.pipeline() - for task_data in ready: - pipe.xadd(stream_key, {"data": task_data}) - pipe.zrem(delayed_key, task_data) - await pipe.execute() + async def _next_delayed_score(self, queue: str) -> float | None: + """Return the ETA timestamp of the earliest delayed task, or None.""" + if not self._client: + return None + + items = await self._client.zrange( + self._delayed_key(queue), + 0, + 0, + withscores=True, + ) + if items: + _member, score = items[0] + return float(score) + + return None + + async def _delayed_mover_loop(self, queue: str) -> None: + """Background loop: promote ready delayed tasks to the stream. + + The loop adapts its polling interval based on the next known ETA: + + * **Tasks just moved** — re-check quickly (mover_min_sleep_secs) in + case more messages are ready in a burst. + * **Delayed tasks pending** — sleep until the earliest one matures + (clamped to [mover_min_sleep_secs, mover_max_sleep_secs]). + * **No delayed tasks** — sleep for mover_max_sleep_secs to minimize + idle Redis traffic. + + All exceptions except ``CancelledError`` are caught so transient Redis + errors (pool exhaustion, network blips) cannot kill the loop. + """ + try: + while self._running: + try: + moved = await self._move_ready_delayed(queue) + if moved: + # More messages may be ready right behind; re-check fast. + await asyncio.sleep(self.mover_min_sleep_secs) + continue + + # Peek at the next delayed task to decide how long to + # sleep. + score = await self._next_delayed_score(queue) + if score is not None: + wait = score - datetime.now(UTC).timestamp() + wait = max(self.mover_min_sleep_secs, wait) + wait = min(self.mover_max_sleep_secs, wait) + else: + wait = self.mover_max_sleep_secs + + await asyncio.sleep(wait) + + except asyncio.CancelledError: + raise + except Exception: + _logger.exception("delayed-mover iteration failed, will retry") + await asyncio.sleep(self.mover_max_sleep_secs) + except asyncio.CancelledError: + pass async def _claim_stale_messages(self, queue: str) -> list[TaskEnvelope]: """Claim messages that have been pending too long (dead consumer recovery).""" @@ -179,56 +269,61 @@ async def consume(self, queue: str = DEFAULT_QUEUE) -> AsyncGenerator[TaskEnvelo stream_key = self._stream_key(queue) last_claim_check = 0.0 - while self._running: - # Periodically move ready delayed tasks - await self._move_ready_delayed(queue) - - # Periodically check for stale messages to reclaim (at-least-once) - now = asyncio.get_event_loop().time() - if ( - self.delivery_mode == DeliveryMode.AT_LEAST_ONCE - and now - last_claim_check > 10 - ): - stale = await self._claim_stale_messages(queue) - for envelope in stale: - yield envelope - last_claim_check = now - - try: - # XREADGROUP: read new messages for this consumer - # Use ">" to read only new messages not yet delivered to any consumer - result = await self._client.xreadgroup( - groupname=self.consumer_group, - consumername=self.consumer_name, - streams={stream_key: ">"}, - count=1, - block=self.block_ms, - ) - - if result: - for stream_name, messages in result: - for msg_id, fields in messages: - if fields and b"data" in fields: - data = fields[b"data"] - message = TaskMessage.loads(data) - - delivery_tag = ( - msg_id.decode() - if isinstance(msg_id, bytes) - else msg_id - ) - - yield TaskEnvelope( - message=message, - delivery_tag=delivery_tag, - raw_data=data, - ) - - except redis.ResponseError as e: - if "NOGROUP" in str(e): - await self._ensure_consumer_group(queue) - else: - raise + # Run delayed-task promotion in the background so short-countdown retries + # are moved to the stream every ~100 ms, independently of block_ms. + delayed_mover = asyncio.create_task(self._delayed_mover_loop(queue)) + try: + while self._running: + # Periodically check for stale messages to reclaim (at-least-once) + now = asyncio.get_running_loop().time() + if ( + self.delivery_mode == DeliveryMode.AT_LEAST_ONCE + and now - last_claim_check > 10 + ): + stale = await self._claim_stale_messages(queue) + for envelope in stale: + yield envelope + last_claim_check = now + + try: + # XREADGROUP: read new messages for this consumer. + # Use ">" to read only messages not yet delivered to any consumer. + result = await self._client.xreadgroup( + groupname=self.consumer_group, + consumername=self.consumer_name, + streams={stream_key: ">"}, + count=1, + block=self.block_ms, + ) + + if result: + for stream_name, messages in result: + for msg_id, fields in messages: + if fields and b"data" in fields: + data = fields[b"data"] + message = TaskMessage.loads(data) + + delivery_tag = ( + msg_id.decode() + if isinstance(msg_id, bytes) + else msg_id + ) + + yield TaskEnvelope( + message=message, + delivery_tag=delivery_tag, + raw_data=data, + ) + + except redis.ResponseError as e: + if "NOGROUP" in str(e): + await self._ensure_consumer_group(queue) + else: + raise + finally: + delayed_mover.cancel() + with contextlib.suppress(asyncio.CancelledError): + await delayed_mover async def ack(self, envelope: TaskEnvelope, queue: str = DEFAULT_QUEUE) -> None: """Acknowledge successful processing (only meaningful for at-least-once).""" @@ -527,9 +622,13 @@ async def cleanup_stale_clients( if not self._client: return 0 - consumers_info = await self._client.xinfo_consumers( - self._stream_key(queue), self.consumer_group - ) + try: + consumers_info = await self._client.xinfo_consumers( + self._stream_key(queue), self.consumer_group + ) + except redis.ResponseError: + # Consumer group may not exist yet + return 0 removed = 0 for consumer in consumers_info: diff --git a/src/chicory/config.py b/src/chicory/config.py index 82bfa36..e41660e 100644 --- a/src/chicory/config.py +++ b/src/chicory/config.py @@ -84,6 +84,17 @@ class RedisBrokerConfig(BaseBrokerConfig): description="Max DLQ size (None for unlimited)", ) + mover_min_sleep_ms: float = Field( + default=50.0, + ge=0, + description="Minimum sleep time in ms for mover thread when not idle", + ) + mover_max_sleep_ms: float = Field( + default=1000.0, + ge=50, + description="Maximum sleep time in ms for mover thread when idling", + ) + key_prefix: str = Field( default="chicory", description="Prefix for all Redis keys", diff --git a/src/chicory/result.py b/src/chicory/result.py index 1695746..47b7fda 100644 --- a/src/chicory/result.py +++ b/src/chicory/result.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import time from typing import TYPE_CHECKING, Generic, cast from chicory.exceptions import BackendNotConfiguredError @@ -47,7 +48,7 @@ async def get( """ backend = self._ensure_backend() - elapsed = 0.0 + deadline = time.monotonic() + timeout if timeout is not None else None while True: result = await backend.get_result(self.task_id) @@ -60,13 +61,12 @@ async def get( case _: pass - if timeout is not None and elapsed >= timeout: + if deadline is not None and time.monotonic() >= deadline: raise TimeoutError( f"Task {self.task_id} did not complete in {timeout}s" ) await asyncio.sleep(poll_interval) - elapsed += poll_interval async def state(self) -> TaskState: """Get the current task state. diff --git a/src/chicory/worker.py b/src/chicory/worker.py index 8989b21..01be645 100644 --- a/src/chicory/worker.py +++ b/src/chicory/worker.py @@ -134,9 +134,6 @@ async def stop(self, timeout: float | None = None) -> None: If None, uses config.shutdown_timeout """ if not self._running: - self._logger.warning( - "Worker is not running", extra={"worker_id": self.worker_id} - ) return if timeout is None: @@ -152,6 +149,12 @@ async def stop(self, timeout: float | None = None) -> None: with contextlib.suppress(asyncio.CancelledError): await self._heartbeat_task + # Stop cleanup loop + if self._cleanup_task: + self._cleanup_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._cleanup_task + # Wait for consume loop to finish if self._consume_task: try: @@ -191,7 +194,7 @@ async def _heartbeat_loop(self) -> None: """ Periodic heartbeat sender. """ - self._logger.info( + self._logger.debug( f"Starting heartbeat loop (interval={self.heartbeat_interval}s)", extra={"worker_id": self.worker_id}, ) @@ -205,12 +208,12 @@ async def _heartbeat_loop(self) -> None: "Heartbeat loop cancelled", extra={"worker_id": self.worker_id} ) finally: - self._logger.info( + self._logger.debug( "Heartbeat loop exited", extra={"worker_id": self.worker_id} ) async def _cleanup_loop(self) -> None: - self._logger.info( + self._logger.debug( f"Starting cleanup loop (interval={self.cleanup_interval}s)", extra={"worker_id": self.worker_id}, ) @@ -218,29 +221,35 @@ async def _cleanup_loop(self) -> None: try: while self._running: await asyncio.sleep(self.cleanup_interval) - removed_backends = ( - await self.app.backend.cleanup_stale_clients( - self.stale_workers_timeout + try: + removed_backends = ( + await self.app.backend.cleanup_stale_clients( + self.stale_workers_timeout + ) + if self.app.backend is not None + else 0 + ) + removed_brokers = await self.app.broker.cleanup_stale_clients( + DEFAULT_QUEUE, self.stale_workers_timeout + ) + self._logger.info( + "Cleanup completed: removed stale backends: %s, removed stale " + "brokers: %s", + removed_backends, + removed_brokers, + extra={"worker_id": self.worker_id}, + ) + except Exception: + self._logger.exception( + "Cleanup iteration failed, will retry next interval", + extra={"worker_id": self.worker_id}, ) - if self.app.backend is not None - else 0 - ) - removed_brokers = await self.app.broker.cleanup_stale_clients( - DEFAULT_QUEUE, self.stale_workers_timeout - ) - self._logger.info( - "Cleanup completed: removed stale backends: %s, removed stale " - "brokers: %s", - removed_backends, - removed_brokers, - extra={"worker_id": self.worker_id}, - ) except asyncio.CancelledError: self._logger.debug( "Cleanup loop cancelled", extra={"worker_id": self.worker_id} ) finally: - self._logger.info( + self._logger.debug( "Cleanup loop exited", extra={"worker_id": self.worker_id} ) @@ -284,7 +293,7 @@ async def run(self) -> None: >>> await worker.run() # Blocks """ # Set up signal handlers (not supported on Windows with ProactorEventLoop) - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() signals_registered = False try: for sig in (signal.SIGINT, signal.SIGTERM): @@ -310,8 +319,8 @@ async def run(self) -> None: self._logger.info( "Keyboard interrupt received", extra={"worker_id": self.worker_id} ) - await self.stop() finally: + await self.stop() # Clean up signal handlers if they were registered if signals_registered: for sig in (signal.SIGINT, signal.SIGTERM): @@ -323,14 +332,26 @@ def _handle_shutdown(self) -> None: "Shutdown signal received", extra={"worker_id": self.worker_id} ) if self._running: - asyncio.create_task(self.stop()) + task = asyncio.create_task(self.stop()) + task.add_done_callback(self._on_stop_done) + + def _on_stop_done(self, task: asyncio.Task[None]) -> None: + """Log any exception raised by the graceful-shutdown task.""" + if not task.cancelled() and (exc := task.exception()): + self._logger.error( + "Error during graceful shutdown", + exc_info=exc, + extra={"worker_id": self.worker_id}, + ) async def _shutdown(self) -> None: """Internal cleanup.""" self._logger.info( "Shutting down worker...", extra={"worker_id": self.worker_id} ) - self._executor.shutdown(wait=True) + await asyncio.get_running_loop().run_in_executor( + None, self._executor.shutdown, True + ) await self.app.disconnect() self._logger.info( "Worker shutdown complete", extra={"worker_id": self.worker_id} @@ -469,7 +490,7 @@ async def _execute_task(self, task: Task[Any, Any], message: TaskMessage) -> Any if task.is_async: result = await task.fn(*args, **kwargs) else: - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() result = await loop.run_in_executor( self._executor, functools.partial(task.fn, *args, **kwargs) )