diff --git a/src/Debug/QueueDecorator.php b/src/Debug/QueueDecorator.php index 908ad064..7400fae6 100644 --- a/src/Debug/QueueDecorator.php +++ b/src/Debug/QueueDecorator.php @@ -6,6 +6,7 @@ use Yiisoft\Queue\MessageStatus; use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface; use Yiisoft\Queue\QueueInterface; final class QueueDecorator implements QueueInterface @@ -30,6 +31,16 @@ public function push(MessageInterface $message): MessageInterface return $message; } + public function withMiddlewares(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self + { + return new self($this->queue->withMiddlewares(...$middlewareDefinitions), $this->collector); + } + + public function withMiddlewaresAdded(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self + { + return new self($this->queue->withMiddlewaresAdded(...$middlewareDefinitions), $this->collector); + } + public function run(int $max = 0): int { return $this->queue->run($max); diff --git a/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php b/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php index e303f5e6..ffef3a26 100644 --- a/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php +++ b/src/Middleware/FailureHandling/Implementation/ExponentialDelayMiddleware.php @@ -10,7 +10,6 @@ use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface; use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface; use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface; -use Yiisoft\Queue\Middleware\Push\NoopMessageHandlerPush; use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope; @@ -65,11 +64,10 @@ public function processFailure( $message = $request->getMessage(); if ($this->suites($message)) { $envelope = new FailureEnvelope($message, $this->createNewMeta($message)); - $envelope = $this->delayMiddleware - ->withDelay($this->getDelay($envelope)) - ->processPush($envelope, new NoopMessageHandlerPush()); $queue = $this->queue ?? $request->getQueue(); - $messageNew = $queue->push($envelope); + $messageNew = $queue + ->withMiddlewaresAdded($this->delayMiddleware->withDelay($this->getDelay($envelope))) + ->push($envelope); return $request->withMessage($messageNew); } diff --git a/src/Middleware/Push/NoopMessageHandlerPush.php b/src/Middleware/Push/NoopMessageHandlerPush.php deleted file mode 100644 index 77b92d6d..00000000 --- a/src/Middleware/Push/NoopMessageHandlerPush.php +++ /dev/null @@ -1,18 +0,0 @@ -expects(self::exactly(7))->method('push')->willReturnCallback($queueCallback); + $queue->method('withMiddlewaresAdded')->willReturnSelf(); $queue->method('getName')->willReturn('simple'); $middlewares = [ diff --git a/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php b/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php index 949646a9..f413db93 100644 --- a/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php +++ b/tests/Unit/Middleware/FailureHandling/Implementation/ExponentialDelayMiddlewareTest.php @@ -7,11 +7,16 @@ use Exception; use InvalidArgumentException; use PHPUnit\Framework\Attributes\DataProvider; +use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\Message\Message; +use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\MessageStatus; use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope; use Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest; use Yiisoft\Queue\Middleware\FailureHandling\Implementation\ExponentialDelayMiddleware; use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface; +use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface; +use Yiisoft\Queue\Middleware\Push\MessageHandlerPushInterface; use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Stubs\StubDelayMiddleware; use Yiisoft\Queue\Tests\TestCase; @@ -134,6 +139,7 @@ public function testPipelineSuccess(): void { $message = new Message('test', null); $queue = $this->createMock(QueueInterface::class); + $queue->method('withMiddlewaresAdded')->willReturnSelf(); $queue->method('push')->willReturnArgument(0); $middleware = new ExponentialDelayMiddleware( 'test', @@ -184,4 +190,87 @@ public function testPipelineFailure(): void $request = new FailureHandlingRequest($message, $exception, $queue); $middleware->processFailure($request, $nextHandler); } + + public function testDelayMiddlewareWrapsActualRetryPush(): void + { + $message = new Message('test', null); + $adapter = new DelayAwareAdapter(); + $queue = $this->createQueue($adapter); + $middleware = new ExponentialDelayMiddleware( + 'test', + 1, + 1, + 1, + 1, + new AdapterContextDelayMiddleware($adapter), + $queue, + ); + + $request = new FailureHandlingRequest($message, new Exception('test'), $queue); + $middleware->processFailure($request, new ThrowingFailureHandler()); + + self::assertSame([1.0], $adapter->delaysDuringPush); + } +} + +final class AdapterContextDelayMiddleware implements DelayMiddlewareInterface +{ + private float $delay = 0.0; + + public function __construct( + private readonly DelayAwareAdapter $adapter, + ) {} + + public function withDelay(float $seconds): self + { + $new = clone $this; + $new->delay = $seconds; + + return $new; + } + + public function processPush(MessageInterface $message, MessageHandlerPushInterface $handler): MessageInterface + { + $this->adapter->activeDelay = $this->delay; + + try { + return $handler->handlePush($message); + } finally { + $this->adapter->activeDelay = null; + } + } +} + +final class DelayAwareAdapter implements AdapterInterface +{ + /** + * @var list + */ + public array $delaysDuringPush = []; + + public ?float $activeDelay = null; + + public function runExisting(callable $handlerCallback): void {} + + public function status(string|int $id): MessageStatus + { + return MessageStatus::DONE; + } + + public function push(MessageInterface $message): MessageInterface + { + $this->delaysDuringPush[] = $this->activeDelay; + + return $message; + } + + public function subscribe(callable $handlerCallback): void {} +} + +final class ThrowingFailureHandler implements MessageFailureHandlerInterface +{ + public function handleFailure(FailureHandlingRequest $request): FailureHandlingRequest + { + throw $request->getException(); + } } diff --git a/tests/Unit/Middleware/FailureHandling/Implementation/SendAgainMiddlewareTest.php b/tests/Unit/Middleware/FailureHandling/Implementation/SendAgainMiddlewareTest.php index 7641ea88..b3fc2c5f 100644 --- a/tests/Unit/Middleware/FailureHandling/Implementation/SendAgainMiddlewareTest.php +++ b/tests/Unit/Middleware/FailureHandling/Implementation/SendAgainMiddlewareTest.php @@ -212,6 +212,7 @@ private function getPreparedQueue(array $metaResult, bool $suites): QueueInterfa }; $queue = $this->createMock(QueueInterface::class); + $queue->method('withMiddlewaresAdded')->willReturnSelf(); $queue->expects($suites ? self::once() : self::never()) ->method('push') ->willReturnCallback($queueAssertion);