From 3eff31f467a8a99a60e478a1dc051361f730f33d Mon Sep 17 00:00:00 2001 From: klimick Date: Thu, 4 Dec 2025 09:47:15 +0300 Subject: [PATCH 1/2] Allow handle deliveries concurrently --- src/Internal/Delivery/Consumer.php | 3 +- tests/AmqpTest.php | 49 +++++++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/src/Internal/Delivery/Consumer.php b/src/Internal/Delivery/Consumer.php index 90bcf3b..c94afe6 100644 --- a/src/Internal/Delivery/Consumer.php +++ b/src/Internal/Delivery/Consumer.php @@ -4,6 +4,7 @@ namespace Thesis\Amqp\Internal\Delivery; +use Revolt\EventLoop; use Thesis\Amqp\Channel; use Thesis\Amqp\DeliveryMessage; @@ -23,7 +24,7 @@ public function __construct(DeliverySupervisor $supervisor) $supervisor->addConsumeListener(static function (DeliveryMessage $delivery, Channel $channel) use (&$consumers): void { $consumer = $consumers[$delivery->consumerTag] ?? null; if ($consumer !== null) { - $consumer($delivery, $channel); + EventLoop::queue(static fn() => $consumer($delivery, $channel)); } }); diff --git a/tests/AmqpTest.php b/tests/AmqpTest.php index b5a73fe..6cdd307 100644 --- a/tests/AmqpTest.php +++ b/tests/AmqpTest.php @@ -16,6 +16,7 @@ use Thesis\Time\TimeSpan; use function Amp\async; use function Amp\delay; +use function PHPUnit\Framework\assertEquals; #[CoversClass(Client::class)] #[CoversClass(Channel::class)] @@ -581,7 +582,7 @@ public function testPublishConsume(string $exchange, string $queue, string $rout $channel->consume(static function (DeliveryMessage $delivery) use (&$consumedMessages, $messageCount, $deferred): void { $consumedMessages[$delivery->exchange][] = $delivery->message->body; $delivery->ack(); - if (\count($consumedMessages[$delivery->exchange]) === $messageCount) { + if (\count($consumedMessages[$delivery->exchange]) === $messageCount && !$deferred->isComplete()) { $deferred->complete($consumedMessages); } }, $queue); @@ -592,6 +593,52 @@ public function testPublishConsume(string $exchange, string $queue, string $rout $channel->close(); } + /** + * @param non-empty-string $exchange + * @param non-empty-string $queue + * @param non-empty-string $routingKey + */ + #[TestWith(['events', 'events.orders', 'orders'])] + public function testConcurrentConsume(string $exchange, string $queue, string $routingKey): void + { + $channel = $this->client->channel(); + $channel->qos(prefetchCount: 2); + + $channel->queueUnbind($queue, $exchange, $routingKey); + $channel->exchangeDelete($exchange); + $channel->queueDelete($queue); + + $channel->queueDeclare($queue, autoDelete: true); + $channel->exchangeDeclare($exchange, autoDelete: true); + $channel->queueBind($queue, $exchange, $routingKey); + + $channel->publish(new Message('order#1'), $exchange, $routingKey); + $channel->publish(new Message('order#2'), $exchange, $routingKey); + + $consumed = []; + $deferred = new DeferredFuture(); + + $consumerTag = $channel->consume(static function (DeliveryMessage $delivery) use (&$consumed, $deferred): void { + if ($delivery->message->body === 'order#1') { + $deferred->getFuture()->await(); + } + + if ($delivery->message->body === 'order#2') { + $deferred->complete(); + } + + $consumed[] = $delivery->message->body; + $delivery->ack(); + }, $queue); + + $deferred->getFuture()->await(); + + $channel->cancel($consumerTag); + $channel->close(); + + assertEquals(['order#2', 'order#1'], $consumed); + } + public function testPublishConsumeBatch(): void { $channel = $this->client->channel(); From 78fbc7125e95740cf1014ea1a956086ecdab8a22 Mon Sep 17 00:00:00 2001 From: klimick Date: Thu, 4 Dec 2025 14:01:26 +0300 Subject: [PATCH 2/2] cs-fix --- tests/AmqpTest.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/AmqpTest.php b/tests/AmqpTest.php index 6cdd307..720a732 100644 --- a/tests/AmqpTest.php +++ b/tests/AmqpTest.php @@ -16,7 +16,6 @@ use Thesis\Time\TimeSpan; use function Amp\async; use function Amp\delay; -use function PHPUnit\Framework\assertEquals; #[CoversClass(Client::class)] #[CoversClass(Channel::class)] @@ -636,7 +635,7 @@ public function testConcurrentConsume(string $exchange, string $queue, string $r $channel->cancel($consumerTag); $channel->close(); - assertEquals(['order#2', 'order#1'], $consumed); + self::assertEquals(['order#2', 'order#1'], $consumed); } public function testPublishConsumeBatch(): void