diff --git a/src/Internal/Delivery/Consumer.php b/src/Internal/Delivery/Consumer.php index 90bcf3b..c94afe6 100644 --- a/src/Internal/Delivery/Consumer.php +++ b/src/Internal/Delivery/Consumer.php @@ -4,6 +4,7 @@ namespace Thesis\Amqp\Internal\Delivery; +use Revolt\EventLoop; use Thesis\Amqp\Channel; use Thesis\Amqp\DeliveryMessage; @@ -23,7 +24,7 @@ public function __construct(DeliverySupervisor $supervisor) $supervisor->addConsumeListener(static function (DeliveryMessage $delivery, Channel $channel) use (&$consumers): void { $consumer = $consumers[$delivery->consumerTag] ?? null; if ($consumer !== null) { - $consumer($delivery, $channel); + EventLoop::queue(static fn() => $consumer($delivery, $channel)); } }); diff --git a/tests/AmqpTest.php b/tests/AmqpTest.php index b5a73fe..720a732 100644 --- a/tests/AmqpTest.php +++ b/tests/AmqpTest.php @@ -581,7 +581,7 @@ public function testPublishConsume(string $exchange, string $queue, string $rout $channel->consume(static function (DeliveryMessage $delivery) use (&$consumedMessages, $messageCount, $deferred): void { $consumedMessages[$delivery->exchange][] = $delivery->message->body; $delivery->ack(); - if (\count($consumedMessages[$delivery->exchange]) === $messageCount) { + if (\count($consumedMessages[$delivery->exchange]) === $messageCount && !$deferred->isComplete()) { $deferred->complete($consumedMessages); } }, $queue); @@ -592,6 +592,52 @@ public function testPublishConsume(string $exchange, string $queue, string $rout $channel->close(); } + /** + * @param non-empty-string $exchange + * @param non-empty-string $queue + * @param non-empty-string $routingKey + */ + #[TestWith(['events', 'events.orders', 'orders'])] + public function testConcurrentConsume(string $exchange, string $queue, string $routingKey): void + { + $channel = $this->client->channel(); + $channel->qos(prefetchCount: 2); + + $channel->queueUnbind($queue, $exchange, $routingKey); + $channel->exchangeDelete($exchange); + $channel->queueDelete($queue); + + $channel->queueDeclare($queue, autoDelete: true); + $channel->exchangeDeclare($exchange, autoDelete: true); + $channel->queueBind($queue, $exchange, $routingKey); + + $channel->publish(new Message('order#1'), $exchange, $routingKey); + $channel->publish(new Message('order#2'), $exchange, $routingKey); + + $consumed = []; + $deferred = new DeferredFuture(); + + $consumerTag = $channel->consume(static function (DeliveryMessage $delivery) use (&$consumed, $deferred): void { + if ($delivery->message->body === 'order#1') { + $deferred->getFuture()->await(); + } + + if ($delivery->message->body === 'order#2') { + $deferred->complete(); + } + + $consumed[] = $delivery->message->body; + $delivery->ack(); + }, $queue); + + $deferred->getFuture()->await(); + + $channel->cancel($consumerTag); + $channel->close(); + + self::assertEquals(['order#2', 'order#1'], $consumed); + } + public function testPublishConsumeBatch(): void { $channel = $this->client->channel();