diff --git a/src/Middleware/Push/PushMiddlewareDispatcher.php b/src/Middleware/Push/PushMiddlewareDispatcher.php index 9a8379a8..97a11f01 100644 --- a/src/Middleware/Push/PushMiddlewareDispatcher.php +++ b/src/Middleware/Push/PushMiddlewareDispatcher.php @@ -46,13 +46,12 @@ public function dispatch( /** * Returns new instance with middleware handlers replaced with the ones provided. - * The last specified handler will be executed first. * * @param array[]|callable[]|MiddlewarePushInterface[]|string[] $middlewareDefinitions Each array element is: * * - A name of a middleware class. The middleware instance will be obtained from container executed. - * - A callable with `function(ServerRequestInterface $request, RequestHandlerInterface $handler): - * ResponseInterface` signature. + * - A callable with `function(MessageInterface $message, MessageHandlerPushInterface $handler): + * MessageInterface` signature. * - A "callable-like" array in format `[FooMiddleware::class, 'index']`. `FooMiddleware` instance will * be created and `index()` method will be executed. * - A function returning a middleware. The middleware returned will be executed. @@ -73,6 +72,27 @@ public function withMiddlewares(array $middlewareDefinitions): self return $instance; } + /** + * Returns a new instance with additional middleware handlers added to the existing ones. + * + * @param array[]|callable[]|MiddlewarePushInterface[]|string[] $middlewareDefinitions Each array element is: + * + * - A name of a middleware class. The middleware instance will be obtained from container executed. + * - A callable with `function(MessageInterface $message, MessageHandlerPushInterface $handler): + * MessageInterface` signature. + * - A "callable-like" array in format `[FooMiddleware::class, 'index']`. `FooMiddleware` instance will + * be created and `index()` method will be executed. + * - A function returning a middleware. The middleware returned will be executed. + * + * For callables typed parameters are automatically injected using dependency injection container. + * + * @return self New instance of the {@see PushMiddlewareDispatcher} + */ + public function withMiddlewaresAdded(array $middlewareDefinitions): self + { + return $this->withMiddlewares([...array_reverse($this->middlewareDefinitions), ...$middlewareDefinitions]); + } + /** * @return bool Whether there are middleware defined in the dispatcher. */ diff --git a/src/Queue.php b/src/Queue.php index 20d80d89..3f7d8f59 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -21,12 +21,12 @@ final class Queue implements QueueInterface { /** - * @var array|array[]|callable[]|MiddlewarePushInterface[]|string[] + * @var array Queue-specific middleware definitions. */ private array $middlewareDefinitions; /** - * @var MessageHandlerPushInterface $finalPushHandler The final push handler in the middleware chain, responsible + * @var MessageHandlerPushInterface The final push handler in the middleware chain, responsible * for actually sending the message. Uses {@see SynchronousPushHandler} in synchronous mode or * {@see AdapterPushHandler} otherwise. */ @@ -34,18 +34,34 @@ final class Queue implements QueueInterface private string $name; + /** + * @var PushMiddlewareDispatcher The dispatcher used for push messages, combining base dispatcher middleware with + * queue-specific middleware. + */ + private PushMiddlewareDispatcher $dispatcher; + + /** + * @param WorkerInterface $worker The worker that processes messages. + * @param LoopInterface $loop The loop for controlling message processing. + * @param LoggerInterface $logger The logger for debug and informational messages. + * @param PushMiddlewareDispatcher $baseDispatcher The middleware dispatcher. + * @param AdapterInterface|null $adapter The message adapter (`null` for synchronous mode). + * @param string|BackedEnum $name The queue name. + * @param MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions Queue-specific middleware + * definitions. + */ public function __construct( private readonly WorkerInterface $worker, private readonly LoopInterface $loop, private readonly LoggerInterface $logger, - private readonly PushMiddlewareDispatcher $pushMiddlewareDispatcher, + private readonly PushMiddlewareDispatcher $baseDispatcher, private readonly ?AdapterInterface $adapter = null, string|BackedEnum $name = QueueProviderInterface::DEFAULT_QUEUE, MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions, ) { $this->name = StringNormalizer::normalize($name); - $this->middlewareDefinitions = $middlewareDefinitions; $this->finalPushHandler = $this->createFinalPushHandler(); + $this->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions); } public function __clone() @@ -65,10 +81,7 @@ public function push(MessageInterface $message): MessageInterface ['messageType' => $message->getType()], ); - $message = $this->pushMiddlewareDispatcher->dispatch( - $message, - $this->createPushHandler(), - ); + $message = $this->dispatcher->dispatch($message, $this->finalPushHandler); if ($this->isSynchronous()) { $this->logger->info( @@ -143,19 +156,26 @@ public function status(string|int $id): MessageStatus public function withMiddlewares(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self { $instance = clone $this; - $instance->middlewareDefinitions = $middlewareDefinitions; - + $instance->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions); return $instance; } public function withMiddlewaresAdded(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self { $instance = clone $this; - $instance->middlewareDefinitions = [...array_values($instance->middlewareDefinitions), ...array_values($middlewareDefinitions)]; - + $instance->setMiddlewaresAndPrepareDispatcher([...array_values($instance->middlewareDefinitions), ...array_values($middlewareDefinitions)]); return $instance; } + /** + * @param array $middlewareDefinitions + */ + private function setMiddlewaresAndPrepareDispatcher(array $middlewareDefinitions): void + { + $this->middlewareDefinitions = $middlewareDefinitions; + $this->dispatcher = $this->baseDispatcher->withMiddlewaresAdded($middlewareDefinitions); + } + private function handle(MessageInterface $message): bool { $this->worker->process($message, $this); @@ -163,35 +183,6 @@ private function handle(MessageInterface $message): bool return $this->loop->canContinue(); } - private function createPushHandler(): MessageHandlerPushInterface - { - return new class ( - $this->finalPushHandler, - $this->pushMiddlewareDispatcher, - $this->middlewareDefinitions, - ) implements MessageHandlerPushInterface { - public function __construct( - /** - * @var MessageHandlerPushInterface $finishHandler Final handler invoked after all middlewares are - * processed. - */ - private readonly MessageHandlerPushInterface $finishHandler, - private readonly PushMiddlewareDispatcher $dispatcher, - /** - * @var array|array[]|callable[]|MiddlewarePushInterface[]|string[] - */ - private readonly array $middlewares, - ) {} - - public function handlePush(MessageInterface $message): MessageInterface - { - return $this->dispatcher - ->withMiddlewares($this->middlewares) - ->dispatch($message, $this->finishHandler); - } - }; - } - private function createFinalPushHandler(): MessageHandlerPushInterface { return $this->isSynchronous() diff --git a/tests/Integration/MiddlewareTest.php b/tests/Integration/MiddlewareTest.php index fe22c71b..b575a4a8 100644 --- a/tests/Integration/MiddlewareTest.php +++ b/tests/Integration/MiddlewareTest.php @@ -42,6 +42,7 @@ public function testFullStackPush(): void 'channel 1', 'channel 2', 'channel 3', + 'channel 4', ]; $pushMiddlewareDispatcher = new PushMiddlewareDispatcher( @@ -66,7 +67,7 @@ public function testFullStackPush(): void $queue = $queue ->withMiddlewares(new TestMiddleware('Won\'t be executed')) ->withMiddlewares(new TestMiddleware('channel 1'), new TestMiddleware('channel 2')) - ->withMiddlewaresAdded(new TestMiddleware('channel 3')); + ->withMiddlewaresAdded(new TestMiddleware('channel 3'), new TestMiddleware('channel 4')); $message = new Message('test', ['initial']); $messagePushed = $queue->push($message);