Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import functools
import json
import logging
import threading

from pika import SelectConnection
from pika.channel import Channel as PikaChannel
Expand All @@ -14,63 +15,105 @@
from rabbitmq_client.single_threaded_consumer.schemas import ListenQueueConfig


class SingleThreadedMultiQueueListener(object):
class SingleThreadedMultiQueueListener:
"""
This is a single threaded multi queue listener, as opposed the other multi queue listener in the library.
this listener does not implement threads, which enables us to trigger async methods while handling the messages
This is a single-threaded, multi-queue RabbitMQ consumer that supports async message handlers.
It avoids threads for message processing, enabling async API calls (like PDF generation).
"""
def __init__(self, broker_config: BrokerConfig, listen_queue_configs: list[ListenQueueConfig]):
logging.debug('Creating connection object and registering callbacks')
self.connection = get_async_connection(broker_config=broker_config,
on_connection_open=self.on_connection_open,
on_connection_closed=self.on_connection_closed)
self.connection = get_async_connection(
broker_config=broker_config,
on_connection_open=self.on_connection_open,
on_connection_closed=self.on_connection_closed
)
self.listen_queue_configs = listen_queue_configs

def _start_loop(self):
asyncio.set_event_loop(self.loop)
self.loop.run_forever()

def on_connection_open(self, connection: SelectConnection):
connection.channel(on_open_callback=self.on_channel_open)

def on_connection_closed(self, connection, reason):
logging.exception('Connection closed: %s', reason)
raise Exception('Connection closed: %s', reason)
raise Exception(f'Connection closed: {reason}')

def on_channel_open(self, channel: PikaChannel):
channel.add_on_close_callback(self.on_channel_closed)

# Set prefetch to 1 to process one message at a time
channel.basic_qos(prefetch_count=settings.PREFETCH_COUNT)

def on_message(ch: PikaChannel, method: PikaBasic.Deliver,
properties: PikaBasicProperties, body: bytes, args):
message = json.loads(body.decode('utf8'))
loop = asyncio.get_event_loop()
(handler,) = args
loop.run_until_complete(
handler.async_handle_message(
logging.debug("Received message: %s", body)
message = json.loads(body.decode('utf8'))

async def handle_and_ack():
logging.debug("Starting async handler for delivery_tag=%s", method.delivery_tag)
await handler.async_handle_message(
method=method, properties=properties, message=message
)
)
if ch.is_open:
logging.debug('Channel is open, acknowledging the message')
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
logging.warning('Channel closed unable to ack message')
if ch.is_open:
logging.debug('Acknowledging message with delivery_tag=%s', method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
logging.warning('Channel closed before ack')

# Submit task to background asyncio loop
self.loop.call_soon_threadsafe(lambda: asyncio.ensure_future(handle_and_ack(), loop=self.loop))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use single threaded multi queue listener with a consumer that is listening to at-least two queues, and they are both requiring DB connection, push jobs in both queues, at-least 2 jobs in each queue and verify all the 4 jobs are completed. because our DB connection is not thread safe, hence we had taken the earlier approach.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking again

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image image image


channel.basic_qos(prefetch_count=settings.PREFETCH_COUNT)
for listen_queue_config in self.listen_queue_configs:
queue_name: str = listen_queue_config.name
on_message_callback = functools.partial(on_message, args=(listen_queue_config.handler,))
channel.basic_consume(queue=queue_name, on_message_callback=on_message_callback, auto_ack=False)
logging.info("Consuming from queue: %s", queue_name)

def on_channel_closed(self, ch, reason):
logging.exception('Channel %i was closed: %s', ch, reason)
raise Exception('Channel %i was closed: %s', ch, reason)
raise Exception(f'Channel {ch} was closed: {reason}')

def listen(self):
try:
logging.info('Starting background asyncio event loop for async message handlers')
# Set up a
self.loop = asyncio.new_event_loop()
self.loop_thread = threading.Thread(target=self._start_loop, daemon=True)
self.loop_thread.start()

logging.info('Starting IO Loop to listen for messages')
set_consumer_status(is_healthy=True)
self.connection.ioloop.start()
except KeyboardInterrupt:
logging.info('Keyboard Interrupt Closing')
self.connection.close()
self.stop()
except Exception as ex:
set_consumer_status(is_healthy=False)
self.connection.close()
self.stop()
raise ex

def stop(self):
logging.info("Stopping consumer...")

# Stop pika I/O loop if running
try:
if self.connection and not self.connection.is_closed:
self.connection.close()
if self.connection and self.connection.ioloop.is_running:
self.connection.ioloop.stop()
except Exception as e:
logging.warning(f"Error stopping pika loop: {e}")

# Stop asyncio loop
if self.loop and self.loop.is_running():
self.loop.call_soon_threadsafe(self.loop.stop)

# Wait for background loop thread to finish
if hasattr(self, "loop_thread") and self.loop_thread.is_alive():
self.loop_thread.join(timeout=5)

logging.info("Consumer stopped.")