Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/Debug/QueueDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Yiisoft\Queue\MessageStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface;
use Yiisoft\Queue\QueueInterface;

final class QueueDecorator implements QueueInterface
Expand All @@ -30,6 +31,16 @@ public function push(MessageInterface $message): MessageInterface
return $message;
}

public function withMiddlewares(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self
{
return new self($this->queue->withMiddlewares(...$middlewareDefinitions), $this->collector);
}

public function withMiddlewaresAdded(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self
{
return new self($this->queue->withMiddlewaresAdded(...$middlewareDefinitions), $this->collector);
}

public function run(int $max = 0): int
{
return $this->queue->run($max);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface;
use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface;
use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface;
use Yiisoft\Queue\Middleware\Push\NoopMessageHandlerPush;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope;

Expand Down Expand Up @@ -65,11 +64,10 @@ public function processFailure(
$message = $request->getMessage();
if ($this->suites($message)) {
$envelope = new FailureEnvelope($message, $this->createNewMeta($message));
$envelope = $this->delayMiddleware
->withDelay($this->getDelay($envelope))
->processPush($envelope, new NoopMessageHandlerPush());
$queue = $this->queue ?? $request->getQueue();
$messageNew = $queue->push($envelope);
$messageNew = $queue
->withMiddlewaresAdded($this->delayMiddleware->withDelay($this->getDelay($envelope)))
->push($envelope);

return $request->withMessage($messageNew);
}
Expand Down
18 changes: 0 additions & 18 deletions src/Middleware/Push/NoopMessageHandlerPush.php

This file was deleted.

5 changes: 5 additions & 0 deletions src/QueueInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Yiisoft\Queue;

use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface;

interface QueueInterface
{
Expand All @@ -17,6 +18,10 @@ interface QueueInterface
*/
public function push(MessageInterface $message): MessageInterface;

public function withMiddlewares(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self;

Comment on lines 20 to +22
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withMiddlewares() / withMiddlewaresAdded() are new public API methods but are missing docblocks, unlike the rest of QueueInterface. Please document what they do, what argument formats are supported, and (importantly) the middleware execution order when adding/replacing definitions so callers know where the added middleware sits in the pipeline.

Suggested change
public function withMiddlewares(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self;
/**
* Returns a queue instance with push middleware definitions replaced by the specified ones.
*
* Supported middleware definition formats are:
* - a {@see MiddlewarePushInterface} instance;
* - a callable middleware definition;
* - a string service ID or class name;
* - an array definition understood by the implementation/container.
*
* The resulting middleware pipeline uses the provided definitions in the same order they are passed
* to this method. Replacing definitions discards any previously configured middleware.
*
* @param MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions Middleware definitions
* to use for push handling.
*
* @return self A queue instance configured with exactly the specified push middleware definitions.
*/
public function withMiddlewares(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self;
/**
* Returns a queue instance with additional push middleware definitions appended to the existing ones.
*
* Supported middleware definition formats are:
* - a {@see MiddlewarePushInterface} instance;
* - a callable middleware definition;
* - a string service ID or class name;
* - an array definition understood by the implementation/container.
*
* The resulting middleware pipeline preserves the existing middleware order and then appends the provided
* definitions in the same order they are passed to this method. In other words, previously configured
* middleware executes before the middleware added here.
*
* @param MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions Middleware definitions
* to append to the push handling pipeline.
*
* @return self A queue instance configured with the existing push middleware followed by the specified
* additional middleware definitions.
*/

Copilot uses AI. Check for mistakes.
public function withMiddlewaresAdded(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self;

/**
* Handle all existing messages and exit
*
Expand Down
11 changes: 11 additions & 0 deletions stubs/StubQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Yiisoft\Queue\MessageStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface;
use Yiisoft\Queue\QueueInterface;

/**
Expand All @@ -22,6 +23,16 @@ public function push(MessageInterface $message): MessageInterface
return $message;
}

public function withMiddlewares(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self
{
return clone $this;
}

public function withMiddlewaresAdded(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self
{
return clone $this;
}

public function run(int $max = 0): int
{
return 0;
Expand Down
11 changes: 11 additions & 0 deletions tests/App/DummyQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Exception;
use Yiisoft\Queue\MessageStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface;
use Yiisoft\Queue\QueueInterface;

final class DummyQueue implements QueueInterface
Expand All @@ -20,6 +21,16 @@ public function push(MessageInterface $message): MessageInterface
return $message;
}

public function withMiddlewares(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self
{
return clone $this;
}

public function withMiddlewaresAdded(MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): self
{
return clone $this;
}

public function run(int $max = 0): int
{
throw new Exception('`run()` method is not implemented yet.');
Expand Down
1 change: 1 addition & 0 deletions tests/Integration/MiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public function testFullStackFailure(): void
$callableFactory = new CallableFactory($container);

$queue->expects(self::exactly(7))->method('push')->willReturnCallback($queueCallback);
$queue->method('withMiddlewaresAdded')->willReturnSelf();
$queue->method('getName')->willReturn('simple');

$middlewares = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@
use Exception;
use InvalidArgumentException;
use PHPUnit\Framework\Attributes\DataProvider;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\MessageStatus;
use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope;
use Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest;
use Yiisoft\Queue\Middleware\FailureHandling\Implementation\ExponentialDelayMiddleware;
use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface;
use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface;
use Yiisoft\Queue\Middleware\Push\MessageHandlerPushInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Stubs\StubDelayMiddleware;
use Yiisoft\Queue\Tests\TestCase;
Expand Down Expand Up @@ -134,6 +139,7 @@ public function testPipelineSuccess(): void
{
$message = new Message('test', null);
$queue = $this->createMock(QueueInterface::class);
$queue->method('withMiddlewaresAdded')->willReturnSelf();
$queue->method('push')->willReturnArgument(0);
$middleware = new ExponentialDelayMiddleware(
'test',
Expand Down Expand Up @@ -184,4 +190,87 @@ public function testPipelineFailure(): void
$request = new FailureHandlingRequest($message, $exception, $queue);
$middleware->processFailure($request, $nextHandler);
}

public function testDelayMiddlewareWrapsActualRetryPush(): void
{
$message = new Message('test', null);
$adapter = new DelayAwareAdapter();
$queue = $this->createQueue($adapter);
$middleware = new ExponentialDelayMiddleware(
'test',
1,
1,
1,
1,
new AdapterContextDelayMiddleware($adapter),
$queue,
);

$request = new FailureHandlingRequest($message, new Exception('test'), $queue);
$middleware->processFailure($request, new ThrowingFailureHandler());

self::assertSame([1.0], $adapter->delaysDuringPush);
}
}

final class AdapterContextDelayMiddleware implements DelayMiddlewareInterface
{
private float $delay = 0.0;

public function __construct(
private readonly DelayAwareAdapter $adapter,
) {}

public function withDelay(float $seconds): self
{
$new = clone $this;
$new->delay = $seconds;

return $new;
}

public function processPush(MessageInterface $message, MessageHandlerPushInterface $handler): MessageInterface
{
$this->adapter->activeDelay = $this->delay;

try {
return $handler->handlePush($message);
} finally {
$this->adapter->activeDelay = null;
}
}
}

final class DelayAwareAdapter implements AdapterInterface
{
/**
* @var list<float|null>
*/
public array $delaysDuringPush = [];

public ?float $activeDelay = null;

public function runExisting(callable $handlerCallback): void {}

public function status(string|int $id): MessageStatus
{
return MessageStatus::DONE;
}

public function push(MessageInterface $message): MessageInterface
{
$this->delaysDuringPush[] = $this->activeDelay;

return $message;
}

public function subscribe(callable $handlerCallback): void {}
}

final class ThrowingFailureHandler implements MessageFailureHandlerInterface
{
public function handleFailure(FailureHandlingRequest $request): FailureHandlingRequest
{
throw $request->getException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ private function getPreparedQueue(array $metaResult, bool $suites): QueueInterfa
};

$queue = $this->createMock(QueueInterface::class);
$queue->method('withMiddlewaresAdded')->willReturnSelf();
$queue->expects($suites ? self::once() : self::never())
->method('push')
->willReturnCallback($queueAssertion);
Expand Down