From e7b0ec59fd0b02279b07f29c71db9dba2a790e0e Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Fri, 1 May 2026 20:15:20 +0300 Subject: [PATCH 1/4] Improve push middleware dispatch --- .../Push/PushMiddlewareDispatcher.php | 26 +++++- src/Queue.php | 79 +++++++++---------- tests/Integration/MiddlewareTest.php | 3 +- 3 files changed, 63 insertions(+), 45 deletions(-) diff --git a/src/Middleware/Push/PushMiddlewareDispatcher.php b/src/Middleware/Push/PushMiddlewareDispatcher.php index 9a8379a8..97a11f01 100644 --- a/src/Middleware/Push/PushMiddlewareDispatcher.php +++ b/src/Middleware/Push/PushMiddlewareDispatcher.php @@ -46,13 +46,12 @@ public function dispatch( /** * Returns new instance with middleware handlers replaced with the ones provided. - * The last specified handler will be executed first. * * @param array[]|callable[]|MiddlewarePushInterface[]|string[] $middlewareDefinitions Each array element is: * * - A name of a middleware class. The middleware instance will be obtained from container executed. - * - A callable with `function(ServerRequestInterface $request, RequestHandlerInterface $handler): - * ResponseInterface` signature. + * - A callable with `function(MessageInterface $message, MessageHandlerPushInterface $handler): + * MessageInterface` signature. * - A "callable-like" array in format `[FooMiddleware::class, 'index']`. `FooMiddleware` instance will * be created and `index()` method will be executed. * - A function returning a middleware. The middleware returned will be executed. @@ -73,6 +72,27 @@ public function withMiddlewares(array $middlewareDefinitions): self return $instance; } + /** + * Returns a new instance with additional middleware handlers added to the existing ones. + * + * @param array[]|callable[]|MiddlewarePushInterface[]|string[] $middlewareDefinitions Each array element is: + * + * - A name of a middleware class. The middleware instance will be obtained from container executed. + * - A callable with `function(MessageInterface $message, MessageHandlerPushInterface $handler): + * MessageInterface` signature. + * - A "callable-like" array in format `[FooMiddleware::class, 'index']`. `FooMiddleware` instance will + * be created and `index()` method will be executed. + * - A function returning a middleware. The middleware returned will be executed. + * + * For callables typed parameters are automatically injected using dependency injection container. + * + * @return self New instance of the {@see PushMiddlewareDispatcher} + */ + public function withMiddlewaresAdded(array $middlewareDefinitions): self + { + return $this->withMiddlewares([...array_reverse($this->middlewareDefinitions), ...$middlewareDefinitions]); + } + /** * @return bool Whether there are middleware defined in the dispatcher. */ diff --git a/src/Queue.php b/src/Queue.php index 3b4b39db..15b8be19 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -21,12 +21,12 @@ final class Queue implements QueueInterface { /** - * @var array|array[]|callable[]|MiddlewarePushInterface[]|string[] + * @var array Queue-specific middleware definitions. */ private array $middlewareDefinitions; /** - * @var MessageHandlerPushInterface $finalPushHandler The final push handler in the middleware chain, responsible + * @var MessageHandlerPushInterface The final push handler in the middleware chain, responsible * for actually sending the message. Uses {@see SynchronousPushHandler} in synchronous mode or * {@see AdapterPushHandler} otherwise. */ @@ -34,20 +34,42 @@ final class Queue implements QueueInterface private string $name; + /** + * @var PushMiddlewareDispatcher The base middleware dispatcher. + */ + private readonly PushMiddlewareDispatcher $baseDispatcher; + + /** + * @var PushMiddlewareDispatcher The dispatcher used for push messages, combining base dispatcher middleware with + * queue-specific middleware. + */ + private PushMiddlewareDispatcher $dispatcher; + + /** + * @param WorkerInterface $worker The worker that processes messages. + * @param LoopInterface $loop The loop for controlling message processing. + * @param LoggerInterface $logger The logger for debug and informational messages. + * @param PushMiddlewareDispatcher $dispatcher The middleware dispatcher. + * @param AdapterInterface|null $adapter The message adapter (`null` for synchronous mode). + * @param string|BackedEnum $name The queue name. + * @param MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions Queue-specific middleware + * definitions. + */ public function __construct( private readonly WorkerInterface $worker, private readonly LoopInterface $loop, private readonly LoggerInterface $logger, - private readonly PushMiddlewareDispatcher $pushMiddlewareDispatcher, + PushMiddlewareDispatcher $dispatcher, private readonly ?AdapterInterface $adapter = null, string|BackedEnum $name = QueueProviderInterface::DEFAULT_QUEUE, MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions, ) { + $this->baseDispatcher = $dispatcher; $this->name = StringNormalizer::normalize($name); - $this->middlewareDefinitions = $middlewareDefinitions; $this->finalPushHandler = $this->isSynchronous() ? new SynchronousPushHandler($worker, $this) : new AdapterPushHandler($this->adapter); + $this->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions); } public function getName(): string @@ -62,10 +84,7 @@ public function push(MessageInterface $message): MessageInterface ['messageType' => $message->getType()], ); - $message = $this->pushMiddlewareDispatcher->dispatch( - $message, - $this->createPushHandler(), - ); + $message = $this->dispatcher->dispatch($message, $this->finalPushHandler); if ($this->isSynchronous()) { $this->logger->info( @@ -140,19 +159,26 @@ public function status(string|int $id): MessageStatus public function withMiddlewares(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self { $instance = clone $this; - $instance->middlewareDefinitions = $middlewareDefinitions; - + $instance->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions); return $instance; } public function withMiddlewaresAdded(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self { $instance = clone $this; - $instance->middlewareDefinitions = [...array_values($instance->middlewareDefinitions), ...array_values($middlewareDefinitions)]; - + $instance->setMiddlewaresAndPrepareDispatcher([...array_values($instance->middlewareDefinitions), ...array_values($middlewareDefinitions)]); return $instance; } + /** + * @param array $middlewareDefinitions + */ + private function setMiddlewaresAndPrepareDispatcher(array $middlewareDefinitions): void + { + $this->middlewareDefinitions = $middlewareDefinitions; + $this->dispatcher = $this->baseDispatcher->withMiddlewaresAdded($middlewareDefinitions); + } + private function handle(MessageInterface $message): bool { $this->worker->process($message, $this); @@ -160,35 +186,6 @@ private function handle(MessageInterface $message): bool return $this->loop->canContinue(); } - private function createPushHandler(): MessageHandlerPushInterface - { - return new class ( - $this->finalPushHandler, - $this->pushMiddlewareDispatcher, - $this->middlewareDefinitions, - ) implements MessageHandlerPushInterface { - public function __construct( - /** - * @var MessageHandlerPushInterface $finishHandler Final handler invoked after all middlewares are - * processed. - */ - private readonly MessageHandlerPushInterface $finishHandler, - private readonly PushMiddlewareDispatcher $dispatcher, - /** - * @var array|array[]|callable[]|MiddlewarePushInterface[]|string[] - */ - private readonly array $middlewares, - ) {} - - public function handlePush(MessageInterface $message): MessageInterface - { - return $this->dispatcher - ->withMiddlewares($this->middlewares) - ->dispatch($message, $this->finishHandler); - } - }; - } - /** * @psalm-assert-if-false !null $this->adapter */ diff --git a/tests/Integration/MiddlewareTest.php b/tests/Integration/MiddlewareTest.php index 1b1cc23e..7fccd65a 100644 --- a/tests/Integration/MiddlewareTest.php +++ b/tests/Integration/MiddlewareTest.php @@ -43,6 +43,7 @@ public function testFullStackPush(): void 'channel 1', 'channel 2', 'channel 3', + 'channel 4', ]; $pushMiddlewareDispatcher = new PushMiddlewareDispatcher( @@ -67,7 +68,7 @@ public function testFullStackPush(): void $queue = $queue ->withMiddlewares(new TestMiddleware('Won\'t be executed')) ->withMiddlewares(new TestMiddleware('channel 1'), new TestMiddleware('channel 2')) - ->withMiddlewaresAdded(new TestMiddleware('channel 3')); + ->withMiddlewaresAdded(new TestMiddleware('channel 3'), new TestMiddleware('channel 4')); $message = new Message('test', ['initial']); $messagePushed = $queue->push($message); From ab5102223f6282846d37a27cedc3d30663eece0f Mon Sep 17 00:00:00 2001 From: vjik <525501+vjik@users.noreply.github.com> Date: Fri, 1 May 2026 17:16:35 +0000 Subject: [PATCH 2/4] Apply PHP CS Fixer and Rector changes (CI) --- src/Queue.php | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/Queue.php b/src/Queue.php index 15b8be19..5ce40a09 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -34,11 +34,6 @@ final class Queue implements QueueInterface private string $name; - /** - * @var PushMiddlewareDispatcher The base middleware dispatcher. - */ - private readonly PushMiddlewareDispatcher $baseDispatcher; - /** * @var PushMiddlewareDispatcher The dispatcher used for push messages, combining base dispatcher middleware with * queue-specific middleware. @@ -49,7 +44,7 @@ final class Queue implements QueueInterface * @param WorkerInterface $worker The worker that processes messages. * @param LoopInterface $loop The loop for controlling message processing. * @param LoggerInterface $logger The logger for debug and informational messages. - * @param PushMiddlewareDispatcher $dispatcher The middleware dispatcher. + * @param PushMiddlewareDispatcher $baseDispatcher The middleware dispatcher. * @param AdapterInterface|null $adapter The message adapter (`null` for synchronous mode). * @param string|BackedEnum $name The queue name. * @param MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions Queue-specific middleware @@ -59,12 +54,11 @@ public function __construct( private readonly WorkerInterface $worker, private readonly LoopInterface $loop, private readonly LoggerInterface $logger, - PushMiddlewareDispatcher $dispatcher, + private readonly PushMiddlewareDispatcher $baseDispatcher, private readonly ?AdapterInterface $adapter = null, string|BackedEnum $name = QueueProviderInterface::DEFAULT_QUEUE, MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions, ) { - $this->baseDispatcher = $dispatcher; $this->name = StringNormalizer::normalize($name); $this->finalPushHandler = $this->isSynchronous() ? new SynchronousPushHandler($worker, $this) From d8f45270314393c421f1c3546eac4be08258a47f Mon Sep 17 00:00:00 2001 From: viktorprogger Date: Sat, 2 May 2026 15:35:35 +0200 Subject: [PATCH 3/4] fix(queue): create new final handler on queue clone --- src/Queue.php | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/Queue.php b/src/Queue.php index 5ce40a09..871b96ec 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -60,12 +60,15 @@ public function __construct( MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions, ) { $this->name = StringNormalizer::normalize($name); - $this->finalPushHandler = $this->isSynchronous() - ? new SynchronousPushHandler($worker, $this) - : new AdapterPushHandler($this->adapter); + $this->finalPushHandler = $this->createFinalPushHandler(); $this->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions); } + public function __clone() + { + $this->finalPushHandler = $this->createFinalPushHandler(); + } + public function getName(): string { return $this->name; @@ -187,4 +190,11 @@ private function isSynchronous(): bool { return $this->adapter === null; } + + private function createFinalPushHandler(): MessageHandlerPushInterface + { + return $this->isSynchronous() + ? new SynchronousPushHandler($this->worker, $this) + : new AdapterPushHandler($this->adapter); + } } From 50e07db0b04d63881c3cfd490a5cf63062432983 Mon Sep 17 00:00:00 2001 From: Sergei Predvoditelev Date: Sat, 2 May 2026 18:38:36 +0300 Subject: [PATCH 4/4] fix --- src/Queue.php | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/Queue.php b/src/Queue.php index dfbab758..3f7d8f59 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -197,11 +197,4 @@ private function isSynchronous(): bool { return $this->adapter === null; } - - private function createFinalPushHandler(): MessageHandlerPushInterface - { - return $this->isSynchronous() - ? new SynchronousPushHandler($this->worker, $this) - : new AdapterPushHandler($this->adapter); - } }