diff --git a/config/di.php b/config/di.php index f7748b61..06de09e4 100644 --- a/config/di.php +++ b/config/di.php @@ -14,9 +14,9 @@ use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher; use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory; use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactoryInterface; +use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig; use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory; use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactoryInterface; -use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher; use Yiisoft\Queue\Worker\Worker as QueueWorker; use Yiisoft\Queue\Worker\WorkerInterface; @@ -36,8 +36,8 @@ PushMiddlewareFactoryInterface::class => PushMiddlewareFactory::class, ConsumeMiddlewareFactoryInterface::class => ConsumeMiddlewareFactory::class, FailureMiddlewareFactoryInterface::class => FailureMiddlewareFactory::class, - PushMiddlewareDispatcher::class => [ - '__construct()' => ['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-push']], + PushMiddlewareConfig::class => [ + '__construct()' => ['commonMiddlewareDefinitions' => $params['yiisoft/queue']['middlewares-push']], ], ConsumeMiddlewareDispatcher::class => [ '__construct()' => ['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-consume']], diff --git a/docs/guide/en/configuration-manual.md b/docs/guide/en/configuration-manual.md index 220ee40d..f4d70346 100644 --- a/docs/guide/en/configuration-manual.md +++ b/docs/guide/en/configuration-manual.md @@ -22,8 +22,8 @@ use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher; use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareFactory; use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher; use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory; +use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig; use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory; -use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher; use Yiisoft\Queue\Queue; use Yiisoft\Queue\Worker\Worker; @@ -51,7 +51,7 @@ $failureMiddlewareDispatcher = new FailureMiddlewareDispatcher( [], ); -$pushMiddlewareDispatcher = new PushMiddlewareDispatcher( +$pushMiddlewareConfig = new PushMiddlewareConfig( new PushMiddlewareFactory($container, $callableFactory), ); @@ -75,7 +75,7 @@ $queue = new Queue( $worker, $loop, $logger, - $pushMiddlewareDispatcher, + $pushMiddlewareConfig, ); // Now you can push messages diff --git a/docs/guide/en/synchronous-mode.md b/docs/guide/en/synchronous-mode.md index 8fe4493a..ce110955 100644 --- a/docs/guide/en/synchronous-mode.md +++ b/docs/guide/en/synchronous-mode.md @@ -15,15 +15,15 @@ $logger = $DIContainer->get(\Psr\Log\LoggerInterface::class); $worker = $DIContainer->get(\Yiisoft\Queue\Worker\WorkerInterface::class); $loop = $DIContainer->get(\Yiisoft\Queue\Cli\LoopInterface::class); -$pushMiddlewareDispatcher = $DIContainer->get( - \Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher::class +$pushMiddlewareConfig = $DIContainer->get( + \Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig::class ); $queue = new Yiisoft\Queue\Queue( $worker, $loop, $logger, - $pushMiddlewareDispatcher, + $pushMiddlewareConfig, ); ``` diff --git a/src/Middleware/Push/PushMiddlewareConfig.php b/src/Middleware/Push/PushMiddlewareConfig.php new file mode 100644 index 00000000..848635aa --- /dev/null +++ b/src/Middleware/Push/PushMiddlewareConfig.php @@ -0,0 +1,21 @@ + $commonMiddlewareDefinitions Middleware definitions. + */ + public function __construct( + public readonly PushMiddlewareFactoryInterface $middlewareFactory, + public readonly array $commonMiddlewareDefinitions = [], + ) {} +} diff --git a/src/Middleware/Push/PushMiddlewareDispatcher.php b/src/Middleware/Push/PushMiddlewareDispatcher.php index b500740e..697b25d4 100644 --- a/src/Middleware/Push/PushMiddlewareDispatcher.php +++ b/src/Middleware/Push/PushMiddlewareDispatcher.php @@ -6,7 +6,11 @@ use Closure; use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\Queue; +/** + * @internal Used internally by {@see Queue}. + */ final class PushMiddlewareDispatcher { /** @@ -15,35 +19,44 @@ final class PushMiddlewareDispatcher * @var PushMiddlewareStack|null The middleware stack. */ private ?PushMiddlewareStack $stack = null; + /** - * @var array[]|callable[]|PushMiddlewareInterface[]|string[] + * @param PushMiddlewareFactoryInterface $middlewareFactory Factory used to instantiate middleware. + * @param array $middlewareDefinitions Middleware definitions. + * @param PushHandlerInterface $finishHandler Finish message handler. */ - private array $middlewareDefinitions; - public function __construct( private readonly PushMiddlewareFactoryInterface $middlewareFactory, - array|callable|string|PushMiddlewareInterface ...$middlewareDefinitions, - ) { - $this->middlewareDefinitions = $middlewareDefinitions; - } + private array $middlewareDefinitions, + private PushHandlerInterface $finishHandler, + ) {} /** * Dispatch message through middleware to get response. * * @param MessageInterface $message Message to pass to middleware. - * @param PushHandlerInterface $finishHandler Handler to use in case no middleware produced a response. */ - public function dispatch( - MessageInterface $message, - PushHandlerInterface $finishHandler, - ): MessageInterface { + public function dispatch(MessageInterface $message): MessageInterface + { if ($this->stack === null) { - $this->stack = new PushMiddlewareStack($this->buildMiddlewares(), $finishHandler); + $this->stack = new PushMiddlewareStack($this->buildMiddlewares(), $this->finishHandler); } return $this->stack->handlePush($message); } + public function withFinishHandler(PushHandlerInterface $finishHandler): self + { + $instance = clone $this; + $instance->finishHandler = $finishHandler; + + // Fixes a memory leak. + unset($instance->stack); + $instance->stack = null; + + return $instance; + } + /** * Returns new instance with middleware handlers replaced with the ones provided. * diff --git a/src/Queue.php b/src/Queue.php index 8c389bf5..2da7e615 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -11,8 +11,9 @@ use Yiisoft\Queue\Message\MessageInterface; use Yiisoft\Queue\Middleware\Push\AdapterPushHandler; use Yiisoft\Queue\Middleware\Push\PushHandlerInterface; -use Yiisoft\Queue\Middleware\Push\PushMiddlewareInterface; +use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig; use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher; +use Yiisoft\Queue\Middleware\Push\PushMiddlewareInterface; use Yiisoft\Queue\Middleware\Push\SynchronousPushHandler; use Yiisoft\Queue\Worker\WorkerInterface; use Yiisoft\Queue\Message\IdEnvelope; @@ -25,13 +26,6 @@ final class Queue implements QueueInterface */ private array $middlewareDefinitions; - /** - * @var PushHandlerInterface The final push handler in the middleware chain, responsible - * for actually sending the message. Uses {@see SynchronousPushHandler} in synchronous mode or - * {@see AdapterPushHandler} otherwise. - */ - private PushHandlerInterface $finalPushHandler; - private string $name; /** @@ -40,11 +34,18 @@ final class Queue implements QueueInterface */ private PushMiddlewareDispatcher $dispatcher; + /** + * @var PushMiddlewareDispatcher The base dispatcher built from {@see PushMiddlewareConfig}. + * Holds the common middleware applied to all queues. + */ + private PushMiddlewareDispatcher $baseDispatcher; + /** * @param WorkerInterface $worker The worker that processes messages. * @param LoopInterface $loop The loop for controlling message processing. * @param LoggerInterface $logger The logger for debug and informational messages. - * @param PushMiddlewareDispatcher $baseDispatcher The middleware dispatcher. + * @param PushMiddlewareConfig $middlewareConfig The push middleware configuration: factory and common middleware + * definitions. * @param AdapterInterface|null $adapter The message adapter (`null` for synchronous mode). * @param string|BackedEnum $name The queue name. * @param PushMiddlewareInterface|callable|array|string ...$middlewareDefinitions Queue-specific middleware @@ -54,19 +55,25 @@ public function __construct( private readonly WorkerInterface $worker, private readonly LoopInterface $loop, private readonly LoggerInterface $logger, - private readonly PushMiddlewareDispatcher $baseDispatcher, + PushMiddlewareConfig $middlewareConfig, private readonly ?AdapterInterface $adapter = null, string|BackedEnum $name = QueueProviderInterface::DEFAULT_QUEUE, PushMiddlewareInterface|callable|array|string ...$middlewareDefinitions, ) { $this->name = StringNormalizer::normalize($name); - $this->finalPushHandler = $this->createFinalPushHandler(); + $this->baseDispatcher = new PushMiddlewareDispatcher( + $middlewareConfig->middlewareFactory, + $middlewareConfig->commonMiddlewareDefinitions, + $this->createFinalPushHandler(), + ); $this->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions); } public function __clone() { - $this->finalPushHandler = $this->createFinalPushHandler(); + $finalPushHandler = $this->createFinalPushHandler(); + $this->baseDispatcher = $this->baseDispatcher->withFinishHandler($finalPushHandler); + $this->dispatcher = $this->dispatcher->withFinishHandler($finalPushHandler); } public function getName(): string @@ -81,7 +88,7 @@ public function push(MessageInterface $message): MessageInterface ['messageType' => $message->getType()], ); - $message = $this->dispatcher->dispatch($message, $this->finalPushHandler); + $message = $this->dispatcher->dispatch($message); if ($this->isSynchronous()) { $this->logger->info( diff --git a/tests/Benchmark/QueueBench.php b/tests/Benchmark/QueueBench.php index e094f1e1..67a58a59 100644 --- a/tests/Benchmark/QueueBench.php +++ b/tests/Benchmark/QueueBench.php @@ -18,8 +18,8 @@ use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope; use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher; use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory; +use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig; use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory; -use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher; use Yiisoft\Queue\Queue; use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Tests\Benchmark\Support\VoidAdapter; @@ -59,7 +59,7 @@ public function __construct() $worker, new SimpleLoop(0), $logger, - new PushMiddlewareDispatcher(new PushMiddlewareFactory($container, $callableFactory)), + new PushMiddlewareConfig(new PushMiddlewareFactory($container, $callableFactory)), $this->adapter, ); } diff --git a/tests/Integration/MiddlewareTest.php b/tests/Integration/MiddlewareTest.php index 1d583b52..308e01d2 100644 --- a/tests/Integration/MiddlewareTest.php +++ b/tests/Integration/MiddlewareTest.php @@ -23,8 +23,8 @@ use Yiisoft\Queue\Middleware\FailureHandling\Implementation\ExponentialDelayMiddleware; use Yiisoft\Queue\Middleware\FailureHandling\Implementation\SendAgainMiddleware; use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory; +use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig; use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory; -use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher; use Yiisoft\Queue\Queue; use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Tests\Integration\Support\TestMiddleware; @@ -45,15 +45,17 @@ public function testFullStackPush(): void 'channel 4', ]; - $pushMiddlewareDispatcher = new PushMiddlewareDispatcher( + $pushMiddlewareConfig = new PushMiddlewareConfig( new PushMiddlewareFactory( $this->createMock(ContainerInterface::class), new CallableFactory( $this->createMock(ContainerInterface::class), ), ), - new TestMiddleware('common 1'), - new TestMiddleware('common 2'), + [ + new TestMiddleware('common 1'), + new TestMiddleware('common 2'), + ], ); $worker = $this->createMock(WorkerInterface::class); $worker->method('process')->willReturnArgument(0); @@ -61,7 +63,7 @@ public function testFullStackPush(): void $worker, $this->createMock(LoopInterface::class), $this->createMock(LoggerInterface::class), - $pushMiddlewareDispatcher, + $pushMiddlewareConfig, name: 'test', ); $queue = $queue diff --git a/tests/TestCase.php b/tests/TestCase.php index 3c9f852c..45be8e54 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -20,8 +20,8 @@ use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareFactory; use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher; use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory; +use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig; use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory; -use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher; use Yiisoft\Queue\Queue; use Yiisoft\Queue\Worker\Worker; use Yiisoft\Queue\Worker\WorkerInterface; @@ -97,7 +97,7 @@ protected function createQueue( $this->getWorker(), $this->getLoop(), new NullLogger(), - $this->getPushMiddlewareDispatcher(), + $this->getPushMiddlewareConfig(), $adapter, $name, ); @@ -158,9 +158,9 @@ protected function getMessageHandlers(): array ]; } - protected function getPushMiddlewareDispatcher(): PushMiddlewareDispatcher + protected function getPushMiddlewareConfig(): PushMiddlewareConfig { - return new PushMiddlewareDispatcher( + return new PushMiddlewareConfig( new PushMiddlewareFactory( $this->getContainer(), new CallableFactory($this->getContainer()), diff --git a/tests/Unit/Middleware/Push/MiddlewareDispatcherTest.php b/tests/Unit/Middleware/Push/MiddlewareDispatcherTest.php index 6d26b259..ba80a425 100644 --- a/tests/Unit/Middleware/Push/MiddlewareDispatcherTest.php +++ b/tests/Unit/Middleware/Push/MiddlewareDispatcherTest.php @@ -33,7 +33,7 @@ static function (MessageInterface $message, PushHandlerInterface $handler): Mess ], ); - $result = $dispatcher->dispatch($message, $this->getRequestHandler()); + $result = $dispatcher->dispatch($message); $this->assertSame('New closure test data', $result->getData()); } @@ -46,7 +46,7 @@ public function testArrayMiddlewareCallableDefinition(): void ], ); $dispatcher = $this->createDispatcher($container)->withMiddlewares([[TestCallableMiddleware::class, 'index']]); - $result = $dispatcher->dispatch($message, $this->getRequestHandler()); + $result = $dispatcher->dispatch($message); $this->assertSame('New test data', $result->getData()); } @@ -59,7 +59,7 @@ public function testFactoryArrayDefinition(): void '__construct()' => ['message' => 'New test data from the definition'], ]; $dispatcher = $this->createDispatcher($container)->withMiddlewares([$definition]); - $result = $dispatcher->dispatch($message, $this->getRequestHandler()); + $result = $dispatcher->dispatch($message); $this->assertSame('New test data from the definition', $result->getData()); } @@ -76,7 +76,7 @@ public function testMiddlewareFullStackCalled(): void $dispatcher = $this->createDispatcher()->withMiddlewares([$middleware1, $middleware2]); - $result = $dispatcher->dispatch($message, $this->getRequestHandler()); + $result = $dispatcher->dispatch($message); $this->assertSame('new test data', $result->getData()); } @@ -89,7 +89,7 @@ public function testMiddlewareStackInterrupted(): void $dispatcher = $this->createDispatcher()->withMiddlewares([$middleware1, $middleware2]); - $result = $dispatcher->dispatch($message, $this->getRequestHandler()); + $result = $dispatcher->dispatch($message); $this->assertSame('first', $result->getData()); } @@ -129,24 +129,14 @@ public function testResetStackOnWithMiddlewares(): void $dispatcher = $this ->createDispatcher($container) ->withMiddlewares([[TestCallableMiddleware::class, 'index']]); - $dispatcher->dispatch($message, $this->getRequestHandler()); + $dispatcher->dispatch($message); $dispatcher = $dispatcher->withMiddlewares([TestMiddleware::class]); - $result = $dispatcher->dispatch($message, $this->getRequestHandler()); + $result = $dispatcher->dispatch($message); self::assertSame('New middleware test data', $result->getData()); } - private function getRequestHandler(): PushHandlerInterface - { - return new class implements PushHandlerInterface { - public function handlePush(MessageInterface $message): MessageInterface - { - return $message; - } - }; - } - private function createDispatcher( ?ContainerInterface $container = null, ): PushMiddlewareDispatcher { @@ -155,6 +145,13 @@ private function createDispatcher( return new PushMiddlewareDispatcher( new PushMiddlewareFactory($container, $callableFactory), + [], + new class implements PushHandlerInterface { + public function handlePush(MessageInterface $message): MessageInterface + { + return $message; + } + }, ); }