Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/guide/en/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ It's configured via constructor parameters, too. Here they are:

Requirements:

- Requires a `DelayMiddlewareInterface` implementation and an adapter that supports delayed delivery.
- Requires an adapter that supports delayed delivery. The middleware uses `DelayEnvelope` to specify the delay time. If the adapter doesn't support delaying, it will **ignore the delay data** and process the message immediately.

State tracking:

Expand Down
12 changes: 6 additions & 6 deletions docs/guide/en/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ $queue->push($message);

To push a message that should be processed after 5 minutes:

Delayed execution is implemented via a push middleware.
The middleware must implement `\Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface` and is provided by the adapter package you use.
For example, the official AMQP adapter supports delays: <https://github.com/yiisoft/queue-amqp>
Delayed execution is implemented using the `DelayEnvelope`. The envelope wraps your message with delay information that adapters can use if they support delayed execution.

```php
$delayMiddleware = $container->get(\Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface::class);
$queue->withMiddlewaresAdded($delayMiddleware->withDelay(5 * 60))->push($message);
use Yiisoft\Queue\Message\DelayEnvelope;

$delayedMessage = new DelayEnvelope($message, 5 * 60); // 5 minutes delay
$queue->push($delayedMessage);
```

**Important:** Not every adapter (such as synchronous adapter) supports delayed execution.
**Important:** Adapters that support delaying will use the delay information from `DelayEnvelope` to schedule the message accordingly. Adapters that don't support delaying will **ignore the delay data** and process the message in the queue order.


## Queue handling
Expand Down
32 changes: 32 additions & 0 deletions src/Message/DelayEnvelope.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message;

final class DelayEnvelope extends Envelope
{
public const META_DELAY_SECONDS = 'yii-delay';

public function __construct(MessageInterface $message, private readonly float $delaySeconds)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why float? Sub-second delay is likely unnecessary.

{
parent::__construct($message);
}

public static function fromMessage(MessageInterface $message): static
{
/** @var float|int|string $delaySeconds */
$delaySeconds = $message->getMetadata()[self::META_DELAY_SECONDS] ?? 0.0;
return new self($message, (float) $delaySeconds);
}

public function getDelaySeconds(): float
{
return $this->delaySeconds;
}

protected function getEnvelopeMetadata(): array
{
return [self::META_DELAY_SECONDS => $this->delaySeconds];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
use Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest;
use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface;
use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface;
use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface;
use Yiisoft\Queue\Middleware\Push\NoopMessageHandlerPush;
use Yiisoft\Queue\Message\DelayEnvelope;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope;

Expand All @@ -29,7 +28,6 @@ final class ExponentialDelayMiddleware implements MiddlewareFailureInterface
* @param float $delayInitial The first delay period
* @param float $delayMaximum The maximum delay period
* @param float $exponent Message handling delay will be increased by this multiplication each time it fails
* @param DelayMiddlewareInterface $delayMiddleware A middleware for message delaying.
* @param QueueInterface|null $queue
*/
public function __construct(
Expand All @@ -38,7 +36,6 @@ public function __construct(
private readonly float $delayInitial,
private readonly float $delayMaximum,
private readonly float $exponent,
private readonly DelayMiddlewareInterface $delayMiddleware,
private readonly ?QueueInterface $queue = null,
) {
if ($maxAttempts <= 0) {
Expand All @@ -64,12 +61,10 @@ public function processFailure(
): FailureHandlingRequest {
$message = $request->getMessage();
if ($this->suites($message)) {
$envelope = new FailureEnvelope($message, $this->createNewMeta($message));
$envelope = $this->delayMiddleware
->withDelay($this->getDelay($envelope))
->processPush($envelope, new NoopMessageHandlerPush());
$failureEnvelope = new FailureEnvelope($message, $this->createNewMeta($message));
$delayEnvelope = new DelayEnvelope($failureEnvelope, $this->getDelay($failureEnvelope));
$queue = $this->queue ?? $request->getQueue();
$messageNew = $queue->push($envelope);
$messageNew = $queue->push($delayEnvelope);

return $request->withMessage($messageNew);
}
Expand All @@ -92,11 +87,14 @@ private function createNewMeta(MessageInterface $message): array

private function getAttempts(MessageInterface $message): int
{
return $message->getMetadata()[FailureEnvelope::FAILURE_META_KEY][self::META_KEY_ATTEMPTS . "-$this->id"] ?? 0;
/** @var array{failure-strategy-exponential-delay-attempts?: int} $failureMeta */
$failureMeta = $message->getMetadata()[FailureEnvelope::FAILURE_META_KEY] ?? [];
return $failureMeta[self::META_KEY_ATTEMPTS . "-$this->id"] ?? 0;
}

private function getDelay(MessageInterface $message): float
{
/** @var array{failure-strategy-exponential-delay-delay?: float} $meta */
$meta = $message->getMetadata()[FailureEnvelope::FAILURE_META_KEY] ?? [];
$key = self::META_KEY_DELAY . "-$this->id";

Expand Down
22 changes: 0 additions & 22 deletions src/Middleware/Push/Implementation/DelayMiddlewareInterface.php

This file was deleted.

18 changes: 0 additions & 18 deletions src/Middleware/Push/NoopMessageHandlerPush.php

This file was deleted.

22 changes: 0 additions & 22 deletions stubs/StubDelayMiddleware.php

This file was deleted.

2 changes: 0 additions & 2 deletions tests/Integration/MiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Yiisoft\Injector\Injector;
use Yiisoft\Queue\Stubs\StubDelayMiddleware;
use Yiisoft\Test\Support\Container\SimpleContainer;
use Yiisoft\Test\Support\Log\SimpleLogger;
use Yiisoft\Queue\Cli\LoopInterface;
Expand Down Expand Up @@ -150,7 +149,6 @@ public function testFullStackFailure(): void
1,
5,
2,
new StubDelayMiddleware(),
$queue,
),
],
Expand Down
45 changes: 45 additions & 0 deletions tests/Unit/Message/DelayEnvelopeTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Tests\Unit\Message;

use PHPUnit\Framework\TestCase;
use Yiisoft\Queue\Message\DelayEnvelope;
use Yiisoft\Queue\Message\Message;

final class DelayEnvelopeTest extends TestCase
{
public function testDelayEnvelope(): void
{
$message = new Message('test', ['data' => 'value']);
$delayEnvelope = new DelayEnvelope($message, 300.5);

self::assertSame($message, $delayEnvelope->getMessage());
self::assertSame('test', $delayEnvelope->getType());
self::assertSame(['data' => 'value'], $delayEnvelope->getData());
self::assertSame(300.5, $delayEnvelope->getDelaySeconds());

$metadata = $delayEnvelope->getMetadata();
self::assertArrayHasKey(DelayEnvelope::META_DELAY_SECONDS, $metadata);
self::assertSame(300.5, $metadata[DelayEnvelope::META_DELAY_SECONDS]);
}

public function testFromMessage(): void
{
$message = new Message('test', ['data' => 'value'], [DelayEnvelope::META_DELAY_SECONDS => 150]);
$delayEnvelope = DelayEnvelope::fromMessage($message);

self::assertSame(150.0, $delayEnvelope->getDelaySeconds());
self::assertSame('test', $delayEnvelope->getType());
self::assertSame(['data' => 'value'], $delayEnvelope->getData());
}

public function testFromMessageWithoutDelay(): void
{
$message = new Message('test', ['data' => 'value']);
$delayEnvelope = DelayEnvelope::fromMessage($message);

self::assertSame(0.0, $delayEnvelope->getDelaySeconds());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use Yiisoft\Queue\Middleware\FailureHandling\Implementation\ExponentialDelayMiddleware;
use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Stubs\StubDelayMiddleware;
use Yiisoft\Queue\Message\DelayEnvelope;
use Yiisoft\Queue\Tests\TestCase;

use const PHP_INT_MAX;
Expand Down Expand Up @@ -119,7 +119,6 @@ public static function constructorRequirementsProvider(): array
#[DataProvider('constructorRequirementsProvider')]
public function testConstructorRequirements(bool $success, array $arguments): void
{
$arguments[] = new StubDelayMiddleware();
$arguments[] = $this->createMock(QueueInterface::class);

if (!$success) {
Expand All @@ -141,7 +140,6 @@ public function testPipelineSuccess(): void
1,
1,
1,
new StubDelayMiddleware(),
$queue,
);
$nextHandler = $this->createMock(MessageFailureHandlerInterface::class);
Expand All @@ -152,6 +150,7 @@ public function testPipelineSuccess(): void
self::assertNotEquals($request, $result);
$message = $result->getMessage();
self::assertArrayHasKey(FailureEnvelope::FAILURE_META_KEY, $message->getMetadata());
self::assertArrayHasKey(DelayEnvelope::META_DELAY_SECONDS, $message->getMetadata());

$meta = $message->getMetadata()[FailureEnvelope::FAILURE_META_KEY];
self::assertArrayHasKey(ExponentialDelayMiddleware::META_KEY_ATTEMPTS . '-test', $meta);
Expand All @@ -175,7 +174,6 @@ public function testPipelineFailure(): void
1,
1,
1,
new StubDelayMiddleware(),
$queue,
);
$nextHandler = $this->createMock(MessageFailureHandlerInterface::class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface;
use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Stubs\StubDelayMiddleware;
use Yiisoft\Queue\Tests\TestCase;

class SendAgainMiddlewareTest extends TestCase
Expand Down Expand Up @@ -179,7 +178,6 @@ private function getStrategy(string $strategyName, QueueInterface $queue): Middl
self::EXPONENTIAL_STRATEGY_DELAY_INITIAL,
self::EXPONENTIAL_STRATEGY_DELAY_MAXIMUM,
self::EXPONENTIAL_STRATEGY_EXPONENT,
new StubDelayMiddleware(),
$queue,
),
default => throw new RuntimeException('Unknown strategy'),
Expand Down
Loading