Skip to content

Commit f379eef

Browse files
committed
Add celery_app parameter to job_queue_size for priority queue support
Fixes RabbitMQ PRECONDITION_FAILED error when querying queues configured with custom arguments like x-max-priority. The celery_app parameter allows extracting queue arguments from the app's task_queues configuration and passing them to queue_declare(). The celery_app and broker_url parameters are mutually exclusive - a ValueError is raised if both are provided.
1 parent 4669fcb commit f379eef

2 files changed

Lines changed: 226 additions & 37 deletions

File tree

hirefire_resource/macro/celery.py

Lines changed: 94 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,26 @@ class ChannelError(Exception):
2525
from hirefire_resource.errors import MissingQueueError
2626

2727

28+
def _get_queue_arguments_from_app(app, queues):
29+
"""
30+
Extract queue arguments from Celery app configuration for specified queues.
31+
32+
Args:
33+
app: Celery app instance with task_queues configuration
34+
queues: List of queue names to extract arguments for
35+
36+
Returns:
37+
dict: Mapping of queue name to queue_arguments dict
38+
"""
39+
queue_args = {}
40+
task_queues = getattr(app.conf, "task_queues", None) or []
41+
for q in task_queues:
42+
queue_name = getattr(q, "name", None)
43+
if queue_name and queue_name in queues:
44+
queue_args[queue_name] = getattr(q, "queue_arguments", None)
45+
return queue_args
46+
47+
2848
def mitigate_connection_reset_error(retries=10, delay=1):
2949
"""
3050
Decorator to retry a function when ConnectionResetError occurs.
@@ -210,7 +230,7 @@ async def async_job_queue_latency(*queues, broker_url=None):
210230

211231

212232
@mitigate_connection_reset_error()
213-
def job_queue_size(*queues, broker_url=None):
233+
def job_queue_size(*queues, broker_url=None, celery_app=None):
214234
"""
215235
Calculates the total job queue size across the specified queues using Celery with either Redis
216236
or RabbitMQ (AMQP) as the broker.
@@ -229,22 +249,31 @@ def job_queue_size(*queues, broker_url=None):
229249
using a workaround, such as a separate queue for scheduled tasks that forwards tasks ready
230250
to run to the relevant regular queues. When using RabbitMQ (AMQP), consider using the
231251
Delayed Message Plugin.
252+
- For RabbitMQ queues with custom arguments (e.g., x-max-priority for priority queues),
253+
pass your configured Celery app via the `celery_app` parameter. This allows the function
254+
to extract and use the correct queue arguments when querying RabbitMQ.
232255
233256
Args:
234257
*queues (str): Names of the queues for size measurement.
235-
broker_url (str, optional): The broker URL. Defaults in the following order:
258+
broker_url (str, optional): The broker URL. Cannot be used together with `celery_app`.
259+
Defaults in the following order:
236260
- Passed argument `broker_url`.
237261
- Environment variables `AMQP_URL`, `RABBITMQ_URL`, `RABBITMQ_BIGWIG_URL`,
238262
`CLOUDAMQP_URL`, `REDIS_TLS_URL`, `REDIS_URL`, `REDISTOGO_URL`, `REDISCLOUD_URL`,
239263
`OPENREDIS_URL`.
240264
- "amqp://guest:guest@localhost:5672" if AMQP is available, otherwise
241265
"redis://localhost:6379/0".
266+
celery_app (Celery, optional): A configured Celery app instance. Cannot be used together
267+
with `broker_url`. When provided, the function uses this app's connection and extracts
268+
queue arguments from celery_app.conf.task_queues. This is required for RabbitMQ queues
269+
with custom arguments like x-max-priority.
242270
243271
Returns:
244272
int: The cumulative job queue size across the specified queues.
245273
246274
Raises:
247275
MissingQueueError: If no queue names are provided.
276+
ValueError: If both `broker_url` and `celery_app` are provided.
248277
249278
Examples:
250279
>>> job_queue_size("celery")
@@ -255,43 +284,57 @@ def job_queue_size(*queues, broker_url=None):
255284
42
256285
>>> job_queue_size("celery", broker_url="redis://localhost:6379/0")
257286
42
287+
>>> # For priority queues, pass your configured Celery app:
288+
>>> job_queue_size("celery", celery_app=celery_app)
289+
42
258290
"""
259291
if not queues:
260292
raise MissingQueueError()
261293

262-
broker_url = (
263-
broker_url
264-
or os.environ.get("AMQP_URL")
265-
or os.environ.get("RABBITMQ_URL")
266-
or os.environ.get("RABBITMQ_BIGWIG_URL")
267-
or os.environ.get("CLOUDAMQP_URL")
268-
or os.environ.get("REDIS_TLS_URL")
269-
or os.environ.get("REDIS_URL")
270-
or os.environ.get("REDISTOGO_URL")
271-
or os.environ.get("REDISCLOUD_URL")
272-
or os.environ.get("OPENREDIS_URL")
273-
)
274-
275-
if not broker_url:
276-
if AMQP_AVAILABLE:
277-
broker_url = "amqp://guest:guest@localhost:5672"
278-
else:
279-
broker_url = "redis://localhost:6379/0"
280-
281-
app = Celery(broker=broker_url)
294+
if celery_app is not None and broker_url is not None:
295+
raise ValueError(
296+
"Cannot specify both 'celery_app' and 'broker_url'. "
297+
"Use 'celery_app' to pass your configured Celery app (recommended for priority queues), "
298+
"or 'broker_url' for simple setups."
299+
)
300+
301+
if celery_app is None:
302+
broker_url = (
303+
broker_url
304+
or os.environ.get("AMQP_URL")
305+
or os.environ.get("RABBITMQ_URL")
306+
or os.environ.get("RABBITMQ_BIGWIG_URL")
307+
or os.environ.get("CLOUDAMQP_URL")
308+
or os.environ.get("REDIS_TLS_URL")
309+
or os.environ.get("REDIS_URL")
310+
or os.environ.get("REDISTOGO_URL")
311+
or os.environ.get("REDISCLOUD_URL")
312+
or os.environ.get("OPENREDIS_URL")
313+
)
314+
315+
if not broker_url:
316+
if AMQP_AVAILABLE:
317+
broker_url = "amqp://guest:guest@localhost:5672"
318+
else:
319+
broker_url = "redis://localhost:6379/0"
320+
321+
celery_app = Celery(broker=broker_url)
322+
323+
# Extract queue arguments from app configuration (for priority queues, etc.)
324+
queue_args = _get_queue_arguments_from_app(celery_app, queues)
282325

283326
try:
284-
with app.connection_or_acquire() as connection:
327+
with celery_app.connection_or_acquire() as connection:
285328
with connection.channel() as channel:
286-
worker_task_count = _job_queue_size_worker(app, queues)
287-
broker_task_count = _job_queue_size_broker(channel, queues)
329+
worker_task_count = _job_queue_size_worker(celery_app, queues)
330+
broker_task_count = _job_queue_size_broker(channel, queues, queue_args)
288331
return worker_task_count + broker_task_count
289332

290333
except OperationalError:
291334
return 0
292335

293336

294-
async def async_job_queue_size(*queues, broker_url=None):
337+
async def async_job_queue_size(*queues, broker_url=None, celery_app=None):
295338
"""
296339
Asynchronously calculates the total job queue size across the specified queues using Celery with
297340
either Redis or RabbitMQ (AMQP) as the broker.
@@ -311,22 +354,29 @@ async def async_job_queue_size(*queues, broker_url=None):
311354
using a workaround, such as a separate queue for scheduled tasks that forwards tasks ready
312355
to run to the relevant regular queues. When using RabbitMQ (AMQP), consider using the
313356
Delayed Message Plugin.
357+
- For RabbitMQ queues with custom arguments (e.g., x-max-priority for priority queues),
358+
pass your configured Celery app via the `celery_app` parameter.
314359
315360
Args:
316361
*queues (str): Names of the queues for size measurement.
317-
broker_url (str, optional): The broker URL. Defaults in the following order:
362+
broker_url (str, optional): The broker URL. Cannot be used together with `celery_app`.
363+
Defaults in the following order:
318364
- Passed argument `broker_url`.
319365
- Environment variables `AMQP_URL`, `RABBITMQ_URL`, `RABBITMQ_BIGWIG_URL`,
320366
`CLOUDAMQP_URL`, `REDIS_TLS_URL`, `REDIS_URL`, `REDISTOGO_URL`, `REDISCLOUD_URL`,
321367
`OPENREDIS_URL`.
322368
- "amqp://guest:guest@localhost:5672" if AMQP is available, otherwise
323369
"redis://localhost:6379/0".
370+
celery_app (Celery, optional): A configured Celery app instance. Cannot be used together
371+
with `broker_url`. When provided, the function uses this app's connection and extracts
372+
queue arguments from celery_app.conf.task_queues.
324373
325374
Returns:
326375
int: The cumulative job queue size across the specified queues.
327376
328377
Raises:
329378
MissingQueueError: If no queue names are provided.
379+
ValueError: If both `broker_url` and `celery_app` are provided.
330380
331381
Examples:
332382
>>> await async_job_queue_size("celery")
@@ -335,11 +385,13 @@ async def async_job_queue_size(*queues, broker_url=None):
335385
85
336386
>>> await async_job_queue_size("celery", broker_url="amqp://user:password@host:5672")
337387
42
338-
>>> await async_job_queue_size("celery", broker_url="redis://localhost:6379/0")
388+
>>> await async_job_queue_size("celery", celery_app=celery_app)
339389
42
340390
"""
341391
loop = asyncio.get_event_loop()
342-
func = functools.partial(job_queue_size, *queues, broker_url=broker_url)
392+
func = functools.partial(
393+
job_queue_size, *queues, broker_url=broker_url, celery_app=celery_app
394+
)
343395
return await loop.run_in_executor(None, func)
344396

345397

@@ -399,22 +451,28 @@ def _job_queue_size_worker(app, queues):
399451
return sum(worker_data.get(queue, 0) for queue in queues)
400452

401453

402-
def _job_queue_size_broker(channel, queues):
454+
def _job_queue_size_broker(channel, queues, queue_args=None):
455+
if queue_args is None:
456+
queue_args = {}
457+
403458
if hasattr(channel, "_size"):
404-
fn = _job_queue_size_redis
459+
return sum(_job_queue_size_redis(channel, queue) for queue in queues)
405460
else:
406-
fn = _job_queue_size_rabbitmq
407-
408-
return sum(fn(channel, queue) for queue in queues)
461+
return sum(
462+
_job_queue_size_rabbitmq(channel, queue, queue_args.get(queue))
463+
for queue in queues
464+
)
409465

410466

411467
def _job_queue_size_redis(channel, queue):
412468
return channel.client.llen(queue)
413469

414470

415-
def _job_queue_size_rabbitmq(channel, queue):
471+
def _job_queue_size_rabbitmq(channel, queue, arguments=None):
416472
try:
417-
return channel.queue_declare(queue=queue, passive=True).message_count
473+
return channel.queue_declare(
474+
queue=queue, passive=True, arguments=arguments
475+
).message_count
418476
except ChannelError:
419477
return 0
420478

tests/hirefire_resource/macro/test_celery.py

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import math
22
from datetime import datetime, timedelta, timezone
3-
from unittest.mock import patch
43

54
import pytest
65
from celery import Celery
6+
from kombu import Queue
77

88
from hirefire_resource.errors import MissingQueueError
99
from hirefire_resource.macro.celery import (
@@ -215,3 +215,134 @@ async def test_job_queue_size_with_jobs_async(celery_app):
215215
)
216216
== 10
217217
)
218+
219+
220+
# Tests for priority queues (RabbitMQ only)
221+
# This reproduces the customer issue where queues are configured with x-max-priority
222+
223+
224+
@pytest.fixture
225+
def priority_celery_app():
226+
"""Create a Celery app with priority queue configuration like the customer has."""
227+
broker_url = "amqp://guest:guest@localhost:5672"
228+
app = Celery(broker=broker_url)
229+
230+
# Configure queues with x-max-priority (like customer's setup)
231+
queue_arguments = {"x-max-priority": 10}
232+
app.conf.task_queues = [
233+
Queue("priority_queue", queue_arguments=queue_arguments),
234+
]
235+
app.conf.task_queue_max_priority = 10
236+
app.conf.task_default_priority = 5
237+
238+
return app
239+
240+
241+
@pytest.fixture
242+
def setup_priority_queue(priority_celery_app):
243+
"""Create the priority queue in RabbitMQ with x-max-priority argument."""
244+
with priority_celery_app.connection_or_acquire() as connection:
245+
channel = connection.default_channel
246+
# Delete queue if it exists (to start fresh)
247+
channel.queue_delete(queue="priority_queue")
248+
# Create queue WITH x-max-priority argument
249+
channel.queue_declare(
250+
queue="priority_queue",
251+
durable=True,
252+
auto_delete=False,
253+
arguments={"x-max-priority": 10},
254+
)
255+
256+
yield priority_celery_app
257+
258+
# Cleanup: delete the queue after test
259+
with priority_celery_app.connection_or_acquire() as connection:
260+
channel = connection.default_channel
261+
channel.queue_delete(queue="priority_queue")
262+
263+
264+
def test_job_queue_size_priority_queue_with_broker_url(setup_priority_queue):
265+
"""
266+
Test job_queue_size with broker_url on a priority queue.
267+
268+
Note: In our test environment (RabbitMQ 4.2.2, py-amqp 5.3.1), this works fine
269+
because passive=True declarations don't validate arguments. However, some
270+
RabbitMQ versions or configurations may return PRECONDITION_FAILED.
271+
272+
For guaranteed compatibility with priority queues, use celery_app parameter instead.
273+
"""
274+
priority_celery_app = setup_priority_queue
275+
broker_url = priority_celery_app.conf.broker_url
276+
277+
# Add tasks to the queue
278+
priority_celery_app.send_task("test_task", queue="priority_queue")
279+
priority_celery_app.send_task("test_task", queue="priority_queue")
280+
281+
# Using broker_url (no queue arguments passed)
282+
result = job_queue_size("priority_queue", broker_url=broker_url)
283+
284+
# In our test environment this works, but may fail in other environments
285+
assert result == 2
286+
287+
288+
def test_job_queue_size_priority_queue_with_celery_app_returns_correct_count(
289+
setup_priority_queue,
290+
):
291+
"""
292+
Test that job_queue_size returns the correct count when passed the Celery app
293+
that has the queue configuration with x-max-priority.
294+
295+
This is the recommended approach for priority queues - by passing the celery_app,
296+
we extract the queue arguments and pass them to queue_declare.
297+
"""
298+
priority_celery_app = setup_priority_queue
299+
300+
# Add tasks to the queue
301+
priority_celery_app.send_task("test_task", queue="priority_queue")
302+
priority_celery_app.send_task("test_task", queue="priority_queue")
303+
priority_celery_app.send_task("test_task", queue="priority_queue")
304+
305+
# Recommended: Pass the celery_app parameter so queue arguments are extracted
306+
result = job_queue_size("priority_queue", celery_app=priority_celery_app)
307+
308+
# This should return the correct count
309+
assert result == 3
310+
311+
312+
def test_job_queue_size_raises_error_when_both_broker_url_and_celery_app_provided(
313+
setup_priority_queue,
314+
):
315+
"""
316+
Test that job_queue_size raises ValueError when both broker_url and celery_app
317+
are provided, since they are mutually exclusive.
318+
"""
319+
priority_celery_app = setup_priority_queue
320+
321+
with pytest.raises(ValueError) as exc_info:
322+
job_queue_size(
323+
"priority_queue",
324+
broker_url="amqp://guest:guest@localhost:5672",
325+
celery_app=priority_celery_app,
326+
)
327+
328+
assert "Cannot specify both" in str(exc_info.value)
329+
330+
331+
@pytest.mark.asyncio
332+
async def test_async_job_queue_size_raises_error_when_both_broker_url_and_celery_app_provided(
333+
setup_priority_queue,
334+
):
335+
"""
336+
Test that async_job_queue_size raises ValueError when both broker_url and celery_app
337+
are provided, since they are mutually exclusive.
338+
"""
339+
priority_celery_app = setup_priority_queue
340+
341+
with pytest.raises(ValueError) as exc_info:
342+
await async_job_queue_size(
343+
"priority_queue",
344+
broker_url="amqp://guest:guest@localhost:5672",
345+
celery_app=priority_celery_app,
346+
)
347+
348+
assert "Cannot specify both" in str(exc_info.value)

0 commit comments

Comments
 (0)