Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory;
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactoryInterface;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactoryInterface;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Worker\Worker as QueueWorker;
use Yiisoft\Queue\Worker\WorkerInterface;

Expand All @@ -36,8 +36,8 @@
PushMiddlewareFactoryInterface::class => PushMiddlewareFactory::class,
ConsumeMiddlewareFactoryInterface::class => ConsumeMiddlewareFactory::class,
FailureMiddlewareFactoryInterface::class => FailureMiddlewareFactory::class,
PushMiddlewareDispatcher::class => [
'__construct()' => ['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-push']],
PushMiddlewareConfig::class => [
'__construct()' => ['commonMiddlewareDefinitions' => $params['yiisoft/queue']['middlewares-push']],
],
ConsumeMiddlewareDispatcher::class => [
'__construct()' => ['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-consume']],
Expand Down
6 changes: 3 additions & 3 deletions docs/guide/en/configuration-manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareFactory;
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Queue;
use Yiisoft\Queue\Worker\Worker;

Expand Down Expand Up @@ -51,7 +51,7 @@ $failureMiddlewareDispatcher = new FailureMiddlewareDispatcher(
[],
);

$pushMiddlewareDispatcher = new PushMiddlewareDispatcher(
$pushMiddlewareConfig = new PushMiddlewareConfig(
new PushMiddlewareFactory($container, $callableFactory),
);

Expand All @@ -75,7 +75,7 @@ $queue = new Queue(
$worker,
$loop,
$logger,
$pushMiddlewareDispatcher,
$pushMiddlewareConfig,
);

// Now you can push messages
Expand Down
6 changes: 3 additions & 3 deletions docs/guide/en/synchronous-mode.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ $logger = $DIContainer->get(\Psr\Log\LoggerInterface::class);

$worker = $DIContainer->get(\Yiisoft\Queue\Worker\WorkerInterface::class);
$loop = $DIContainer->get(\Yiisoft\Queue\Cli\LoopInterface::class);
$pushMiddlewareDispatcher = $DIContainer->get(
\Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher::class
$pushMiddlewareConfig = $DIContainer->get(
\Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig::class
);

$queue = new Yiisoft\Queue\Queue(
$worker,
$loop,
$logger,
$pushMiddlewareDispatcher,
$pushMiddlewareConfig,
);
```

Expand Down
21 changes: 21 additions & 0 deletions src/Middleware/Push/PushMiddlewareConfig.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Middleware\Push;

/**
* Holds the push middleware factory and the list of common middleware definitions
* applied to queues.
*/
final class PushMiddlewareConfig
{
/**
* @param PushMiddlewareFactoryInterface $middlewareFactory Factory used to instantiate middleware from definitions.
* @param array<array|callable|PushMiddlewareInterface|string> $commonMiddlewareDefinitions Middleware definitions.
*/
public function __construct(
public readonly PushMiddlewareFactoryInterface $middlewareFactory,
public readonly array $commonMiddlewareDefinitions = [],
) {}
}
39 changes: 26 additions & 13 deletions src/Middleware/Push/PushMiddlewareDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

use Closure;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Queue;

/**
* @internal Used internally by {@see Queue}.
Comment thread
vjik marked this conversation as resolved.
*/
final class PushMiddlewareDispatcher
{
/**
Expand All @@ -15,35 +19,44 @@ final class PushMiddlewareDispatcher
* @var PushMiddlewareStack|null The middleware stack.
*/
private ?PushMiddlewareStack $stack = null;

/**
* @var array[]|callable[]|PushMiddlewareInterface[]|string[]
* @param PushMiddlewareFactoryInterface $middlewareFactory Factory used to instantiate middleware.
* @param array<array|callable|PushMiddlewareInterface|string> $middlewareDefinitions Middleware definitions.
* @param PushHandlerInterface $finishHandler Finish message handler.
*/
private array $middlewareDefinitions;

public function __construct(
private readonly PushMiddlewareFactoryInterface $middlewareFactory,
array|callable|string|PushMiddlewareInterface ...$middlewareDefinitions,
) {
$this->middlewareDefinitions = $middlewareDefinitions;
}
private array $middlewareDefinitions,
private PushHandlerInterface $finishHandler,
) {}

/**
* Dispatch message through middleware to get response.
*
* @param MessageInterface $message Message to pass to middleware.
* @param PushHandlerInterface $finishHandler Handler to use in case no middleware produced a response.
*/
public function dispatch(
MessageInterface $message,
PushHandlerInterface $finishHandler,
): MessageInterface {
public function dispatch(MessageInterface $message): MessageInterface
{
if ($this->stack === null) {
$this->stack = new PushMiddlewareStack($this->buildMiddlewares(), $finishHandler);
$this->stack = new PushMiddlewareStack($this->buildMiddlewares(), $this->finishHandler);
}

return $this->stack->handlePush($message);
}

public function withFinishHandler(PushHandlerInterface $finishHandler): self
{
$instance = clone $this;
$instance->finishHandler = $finishHandler;

// Fixes a memory leak.
unset($instance->stack);
$instance->stack = null;

return $instance;
}

/**
* Returns new instance with middleware handlers replaced with the ones provided.
*
Expand Down
33 changes: 20 additions & 13 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\Push\AdapterPushHandler;
use Yiisoft\Queue\Middleware\Push\PushHandlerInterface;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareInterface;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareInterface;
use Yiisoft\Queue\Middleware\Push\SynchronousPushHandler;
Comment thread
vjik marked this conversation as resolved.
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Queue\Message\IdEnvelope;
Expand All @@ -25,13 +26,6 @@ final class Queue implements QueueInterface
*/
private array $middlewareDefinitions;

/**
* @var PushHandlerInterface The final push handler in the middleware chain, responsible
* for actually sending the message. Uses {@see SynchronousPushHandler} in synchronous mode or
* {@see AdapterPushHandler} otherwise.
*/
private PushHandlerInterface $finalPushHandler;

private string $name;

/**
Expand All @@ -40,11 +34,18 @@ final class Queue implements QueueInterface
*/
private PushMiddlewareDispatcher $dispatcher;

/**
* @var PushMiddlewareDispatcher The base dispatcher built from {@see PushMiddlewareConfig}.
* Holds the common middleware applied to all queues.
*/
private PushMiddlewareDispatcher $baseDispatcher;

/**
* @param WorkerInterface $worker The worker that processes messages.
* @param LoopInterface $loop The loop for controlling message processing.
* @param LoggerInterface $logger The logger for debug and informational messages.
* @param PushMiddlewareDispatcher $baseDispatcher The middleware dispatcher.
* @param PushMiddlewareConfig $middlewareConfig The push middleware configuration: factory and common middleware
* definitions.
* @param AdapterInterface|null $adapter The message adapter (`null` for synchronous mode).
* @param string|BackedEnum $name The queue name.
* @param PushMiddlewareInterface|callable|array|string ...$middlewareDefinitions Queue-specific middleware
Expand All @@ -54,19 +55,25 @@ public function __construct(
private readonly WorkerInterface $worker,
private readonly LoopInterface $loop,
private readonly LoggerInterface $logger,
private readonly PushMiddlewareDispatcher $baseDispatcher,
PushMiddlewareConfig $middlewareConfig,
private readonly ?AdapterInterface $adapter = null,
string|BackedEnum $name = QueueProviderInterface::DEFAULT_QUEUE,
PushMiddlewareInterface|callable|array|string ...$middlewareDefinitions,
) {
$this->name = StringNormalizer::normalize($name);
$this->finalPushHandler = $this->createFinalPushHandler();
$this->baseDispatcher = new PushMiddlewareDispatcher(
$middlewareConfig->middlewareFactory,
$middlewareConfig->commonMiddlewareDefinitions,
$this->createFinalPushHandler(),
);
$this->setMiddlewaresAndPrepareDispatcher($middlewareDefinitions);
}

public function __clone()
{
$this->finalPushHandler = $this->createFinalPushHandler();
$finalPushHandler = $this->createFinalPushHandler();
$this->baseDispatcher = $this->baseDispatcher->withFinishHandler($finalPushHandler);
$this->dispatcher = $this->dispatcher->withFinishHandler($finalPushHandler);
}

public function getName(): string
Expand All @@ -81,7 +88,7 @@ public function push(MessageInterface $message): MessageInterface
['messageType' => $message->getType()],
);

$message = $this->dispatcher->dispatch($message, $this->finalPushHandler);
$message = $this->dispatcher->dispatch($message);

if ($this->isSynchronous()) {
$this->logger->info(
Expand Down
4 changes: 2 additions & 2 deletions tests/Benchmark/QueueBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope;
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Queue;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Tests\Benchmark\Support\VoidAdapter;
Expand Down Expand Up @@ -59,7 +59,7 @@ public function __construct()
$worker,
new SimpleLoop(0),
$logger,
new PushMiddlewareDispatcher(new PushMiddlewareFactory($container, $callableFactory)),
new PushMiddlewareConfig(new PushMiddlewareFactory($container, $callableFactory)),
$this->adapter,
);
}
Expand Down
12 changes: 7 additions & 5 deletions tests/Integration/MiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
use Yiisoft\Queue\Middleware\FailureHandling\Implementation\ExponentialDelayMiddleware;
use Yiisoft\Queue\Middleware\FailureHandling\Implementation\SendAgainMiddleware;
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Queue;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Tests\Integration\Support\TestMiddleware;
Expand All @@ -45,23 +45,25 @@ public function testFullStackPush(): void
'channel 4',
];

$pushMiddlewareDispatcher = new PushMiddlewareDispatcher(
$pushMiddlewareConfig = new PushMiddlewareConfig(
new PushMiddlewareFactory(
$this->createMock(ContainerInterface::class),
new CallableFactory(
$this->createMock(ContainerInterface::class),
),
),
new TestMiddleware('common 1'),
new TestMiddleware('common 2'),
[
new TestMiddleware('common 1'),
new TestMiddleware('common 2'),
],
);
$worker = $this->createMock(WorkerInterface::class);
$worker->method('process')->willReturnArgument(0);
$queue = new Queue(
$worker,
$this->createMock(LoopInterface::class),
$this->createMock(LoggerInterface::class),
$pushMiddlewareDispatcher,
$pushMiddlewareConfig,
name: 'test',
);
$queue = $queue
Expand Down
8 changes: 4 additions & 4 deletions tests/TestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareFactory;
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareFactory;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareConfig;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareFactory;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Queue;
use Yiisoft\Queue\Worker\Worker;
use Yiisoft\Queue\Worker\WorkerInterface;
Expand Down Expand Up @@ -97,7 +97,7 @@ protected function createQueue(
$this->getWorker(),
$this->getLoop(),
new NullLogger(),
$this->getPushMiddlewareDispatcher(),
$this->getPushMiddlewareConfig(),
$adapter,
$name,
);
Expand Down Expand Up @@ -158,9 +158,9 @@ protected function getMessageHandlers(): array
];
}

protected function getPushMiddlewareDispatcher(): PushMiddlewareDispatcher
protected function getPushMiddlewareConfig(): PushMiddlewareConfig
{
return new PushMiddlewareDispatcher(
return new PushMiddlewareConfig(
new PushMiddlewareFactory(
$this->getContainer(),
new CallableFactory($this->getContainer()),
Expand Down
Loading
Loading