From 8ecb7827acf3c9194e45db6b341e7cab9b186806 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Fri, 27 Feb 2026 14:22:33 +0100 Subject: [PATCH 1/8] Add queue pause/resume --- src/CloudTasksApi.php | 2 + src/CloudTasksApiConcrete.php | 12 ++++++ src/CloudTasksApiContract.php | 4 ++ src/CloudTasksApiFake.php | 25 +++++++++++++ src/CloudTasksQueue.php | 14 +++++++ src/CloudTasksServiceProvider.php | 23 ++++++++++++ tests/CloudTasksApiTest.php | 62 +++++++++++++++++++++++++++++++ tests/PauseResumeQueueTest.php | 46 +++++++++++++++++++++++ 8 files changed, 188 insertions(+) create mode 100644 tests/PauseResumeQueueTest.php diff --git a/src/CloudTasksApi.php b/src/CloudTasksApi.php index 0b961fa..d25d666 100644 --- a/src/CloudTasksApi.php +++ b/src/CloudTasksApi.php @@ -12,6 +12,8 @@ * @method static void deleteTask(string $taskName) * @method static Task getTask(string $taskName) * @method static bool exists(string $taskName) + * @method static void pause(string $queue) + * @method static void resume(string $queue) */ class CloudTasksApi extends Facade { diff --git a/src/CloudTasksApiConcrete.php b/src/CloudTasksApiConcrete.php index b62c61e..639f71b 100644 --- a/src/CloudTasksApiConcrete.php +++ b/src/CloudTasksApiConcrete.php @@ -9,6 +9,8 @@ use Google\Cloud\Tasks\V2\GetTaskRequest; use Google\Cloud\Tasks\V2\CreateTaskRequest; use Google\Cloud\Tasks\V2\DeleteTaskRequest; +use Google\Cloud\Tasks\V2\PauseQueueRequest; +use Google\Cloud\Tasks\V2\ResumeQueueRequest; use Google\Cloud\Tasks\V2\Client\CloudTasksClient; class CloudTasksApiConcrete implements CloudTasksApiContract @@ -65,4 +67,14 @@ public function exists(string $taskName): bool return false; } + + public function pause(string $queue): void + { + $this->client->pauseQueue(PauseQueueRequest::build($queue)); + } + + public function resume(string $queue): void + { + $this->client->resumeQueue(ResumeQueueRequest::build($queue)); + } } diff --git a/src/CloudTasksApiContract.php b/src/CloudTasksApiContract.php index 5f0af35..fdaa562 100644 --- a/src/CloudTasksApiContract.php +++ b/src/CloudTasksApiContract.php @@ -15,4 +15,8 @@ public function deleteTask(string $taskName): void; public function getTask(string $taskName): Task; public function exists(string $taskName): bool; + + public function pause(string $queue): void; + + public function resume(string $queue): void; } diff --git a/src/CloudTasksApiFake.php b/src/CloudTasksApiFake.php index 773fcdc..05b5cea 100644 --- a/src/CloudTasksApiFake.php +++ b/src/CloudTasksApiFake.php @@ -23,6 +23,11 @@ class CloudTasksApiFake implements CloudTasksApiContract */ public array $deletedTasks = []; + /** + * @var array + */ + public array $pausedQueues = []; + public function createTask(string $queueName, Task $task): Task { $this->createdTasks[] = compact('queueName', 'task'); @@ -51,6 +56,16 @@ public function exists(string $taskName): bool return false; } + public function pause(string $queue): void + { + $this->pausedQueues[$queue] = true; + } + + public function resume(string $queue): void + { + unset($this->pausedQueues[$queue]); + } + public function assertTaskDeleted(string $taskName): void { Assert::assertTrue( @@ -85,4 +100,14 @@ public function assertCreatedTaskCount(int $count): void { Assert::assertCount($count, $this->createdTasks); } + + public function assertQueuePaused(string $queue): void + { + Assert::assertTrue($this->pausedQueues[$queue] ?? null, 'Expected queue ['.$queue.'] to be paused, but is not'); + } + + public function assertQueueNotPaused(string $queue): void + { + Assert::assertNotTrue($this->pausedQueues[$queue] ?? null, 'Expected queue ['.$queue.'] to not be paused, but it is'); + } } diff --git a/src/CloudTasksQueue.php b/src/CloudTasksQueue.php index b23780b..11e1078 100644 --- a/src/CloudTasksQueue.php +++ b/src/CloudTasksQueue.php @@ -467,4 +467,18 @@ private function getCloudRunJobEnvVars(string $encodedPayload, string $taskName) return $envVars; } + + public function pause(string $queue): void + { + $queueName = CloudTasksClient::queueName($this->config['project'], $this->config['location'], $queue); + + CloudTasksApi::pause($queue); + } + + public function resume(string $queue): void + { + $queueName = CloudTasksClient::queueName($this->config['project'], $this->config['location'], $queue); + + CloudTasksApi::resume($queue); + } } diff --git a/src/CloudTasksServiceProvider.php b/src/CloudTasksServiceProvider.php index 8e4833a..8eaf1a4 100644 --- a/src/CloudTasksServiceProvider.php +++ b/src/CloudTasksServiceProvider.php @@ -7,8 +7,11 @@ use Illuminate\Routing\Router; use Illuminate\Events\Dispatcher; use Illuminate\Queue\QueueManager; +use Illuminate\Support\Facades\Queue; use Illuminate\Foundation\Application; use Illuminate\Queue\Events\JobFailed; +use Illuminate\Queue\Events\QueuePaused; +use Illuminate\Queue\Events\QueueResumed; use Illuminate\Contracts\Debug\ExceptionHandler; use Illuminate\Queue\Events\JobExceptionOccurred; use Google\Cloud\Tasks\V2\Client\CloudTasksClient; @@ -113,6 +116,26 @@ private function registerEvents(): void return; } }); + + $events->listen(QueuePaused::class, function (QueuePaused $event) { + $queue = Queue::connection($event->connection); + + if (! $queue instanceof CloudTasksQueue) { + return; + } + + $queue->pause($event->queue); + }); + + $events->listen(QueueResumed::class, function (QueueResumed $event) { + $queue = Queue::connection($event->connection); + + if (! $queue instanceof CloudTasksQueue) { + return; + } + + $queue->resume($event->queue); + }); } private function registerCommands(): void diff --git a/tests/CloudTasksApiTest.php b/tests/CloudTasksApiTest.php index 5b42338..a92cec7 100644 --- a/tests/CloudTasksApiTest.php +++ b/tests/CloudTasksApiTest.php @@ -9,7 +9,9 @@ use Google\ApiCore\ApiException; use Google\Cloud\Tasks\V2\HttpMethod; use Google\Cloud\Tasks\V2\HttpRequest; +use Google\Cloud\Tasks\V2\Queue\State; use PHPUnit\Framework\Attributes\Test; +use Google\Cloud\Tasks\V2\GetQueueRequest; use Google\Cloud\Tasks\V2\Client\CloudTasksClient; use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksApi; @@ -138,4 +140,64 @@ public function test_delete_task() $this->expectExceptionMessage('NOT_FOUND'); CloudTasksApi::getTask($task->getName()); } + + #[Test] + public function it_can_pause_queues(): void + { + $queueName = $this->client->queueName( + env('CI_CLOUD_TASKS_PROJECT_ID'), + env('CI_CLOUD_TASKS_LOCATION'), + env('CI_CLOUD_TASKS_QUEUE').'-pause' + ); + + $this->ensureQueueIs($queueName, State::RUNNING); + + // Act + CloudTasksApi::pause($queueName); + + // Assert + $this->assertEquals(State::PAUSED, $this->getQueueState($queueName)); + } + + #[Test] + public function it_can_resume_queues(): void + { + $queueName = $this->client->queueName( + env('CI_CLOUD_TASKS_PROJECT_ID'), + env('CI_CLOUD_TASKS_LOCATION'), + env('CI_CLOUD_TASKS_QUEUE').'-pause' + ); + + $this->ensureQueueIs($queueName, State::PAUSED); + + // Act + CloudTasksApi::resume($queueName); + + // Assert + $this->assertEquals(State::RUNNING, $this->getQueueState($queueName)); + } + + private function getQueueState(string $queue): int + { + return $this->client->getQueue(GetQueueRequest::build($queue))->getState(); + } + + private function ensureQueueIs(string $queue, int $desiredState): void + { + $currentState = $this->getQueueState($queue); + + if ($currentState === $desiredState) { + return; + } + + if ($currentState === State::RUNNING && $desiredState === State::PAUSED) { + CloudTasksApi::pause($queue); + } + + if ($currentState === State::PAUSED && $desiredState === State::RUNNING) { + CloudTasksApi::resume($queue); + } + + $this->assertEquals($desiredState, $this->getQueueState($queue)); + } } diff --git a/tests/PauseResumeQueueTest.php b/tests/PauseResumeQueueTest.php new file mode 100644 index 0000000..52315fa --- /dev/null +++ b/tests/PauseResumeQueueTest.php @@ -0,0 +1,46 @@ +version() < 12) { + $this->markTestSkipped('This feature only exists in Laravel 12 and up.'); + } + + CloudTasksApi::fake(); + + // $this->artisan('queue:pause cloudtasks:barbequeue'); + Artisan::call('queue:pause my-cloudtasks-connection:barbequeue'); + + // Assert + CloudTasksApi::assertQueuePaused('barbequeue'); + } + + #[Test] + public function queue_can_be_resumed(): void + { + // Arrange + if (app()->version() < 12) { + $this->markTestSkipped('This feature only exists in Laravel 12 and up.'); + } + + CloudTasksApi::fake(); + + Artisan::call('queue:pause my-cloudtasks-connection:barbequeue'); + Artisan::call('queue:continue my-cloudtasks-connection:barbequeue'); + + // Assert + CloudTasksApi::assertQueueNotPaused('barbequeue'); + } +} From f438288932dea71006e164549c0b600de1df3c6d Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Fri, 27 Feb 2026 14:26:01 +0100 Subject: [PATCH 2/8] Fix event listening for Laravel 11 --- src/CloudTasksServiceProvider.php | 32 +++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/src/CloudTasksServiceProvider.php b/src/CloudTasksServiceProvider.php index 8eaf1a4..c285053 100644 --- a/src/CloudTasksServiceProvider.php +++ b/src/CloudTasksServiceProvider.php @@ -117,25 +117,29 @@ private function registerEvents(): void } }); - $events->listen(QueuePaused::class, function (QueuePaused $event) { - $queue = Queue::connection($event->connection); + if (class_exists('Illuminate\Queue\Events\QueuePaused')) { + $events->listen(QueuePaused::class, function (QueuePaused $event) { + $queue = Queue::connection($event->connection); - if (! $queue instanceof CloudTasksQueue) { - return; - } + if (! $queue instanceof CloudTasksQueue) { + return; + } - $queue->pause($event->queue); - }); + $queue->pause($event->queue); + }); + } - $events->listen(QueueResumed::class, function (QueueResumed $event) { - $queue = Queue::connection($event->connection); + if (class_exists('Illuminate\Queue\Events\QueueResumed')) { + $events->listen(QueueResumed::class, function (QueueResumed $event) { + $queue = Queue::connection($event->connection); - if (! $queue instanceof CloudTasksQueue) { - return; - } + if (! $queue instanceof CloudTasksQueue) { + return; + } - $queue->resume($event->queue); - }); + $queue->resume($event->queue); + }); + } } private function registerCommands(): void From 9513b5301a0ba8d20977efd73679c9a789fd6c62 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Fri, 27 Feb 2026 14:34:15 +0100 Subject: [PATCH 3/8] Add PHPStan ignores --- src/CloudTasksServiceProvider.php | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/CloudTasksServiceProvider.php b/src/CloudTasksServiceProvider.php index c285053..01e79d0 100644 --- a/src/CloudTasksServiceProvider.php +++ b/src/CloudTasksServiceProvider.php @@ -118,26 +118,26 @@ private function registerEvents(): void }); if (class_exists('Illuminate\Queue\Events\QueuePaused')) { - $events->listen(QueuePaused::class, function (QueuePaused $event) { - $queue = Queue::connection($event->connection); + $events->listen(QueuePaused::class, function (QueuePaused $event) { // @phpstan-ignore-line + $queue = Queue::connection($event->connection); // @phpstan-ignore-line if (! $queue instanceof CloudTasksQueue) { return; } - $queue->pause($event->queue); + $queue->pause($event->queue); // @phpstan-ignore-line }); } if (class_exists('Illuminate\Queue\Events\QueueResumed')) { - $events->listen(QueueResumed::class, function (QueueResumed $event) { - $queue = Queue::connection($event->connection); + $events->listen(QueueResumed::class, function (QueueResumed $event) { // @phpstan-ignore-line + $queue = Queue::connection($event->connection); // @phpstan-ignore-line if (! $queue instanceof CloudTasksQueue) { return; } - $queue->resume($event->queue); + $queue->resume($event->queue); // @phpstan-ignore-line }); } } From 08294bbcf40cb17c60bc44dad42aa9bca5722196 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Fri, 27 Feb 2026 16:42:51 +0100 Subject: [PATCH 4/8] Implement a retry --- tests/CloudTasksApiTest.php | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/tests/CloudTasksApiTest.php b/tests/CloudTasksApiTest.php index a92cec7..78c50bd 100644 --- a/tests/CloudTasksApiTest.php +++ b/tests/CloudTasksApiTest.php @@ -156,7 +156,7 @@ public function it_can_pause_queues(): void CloudTasksApi::pause($queueName); // Assert - $this->assertEquals(State::PAUSED, $this->getQueueState($queueName)); + $this->assertEquals(State::PAUSED, $this->waitForQueueState($queueName, State::PAUSED)); } #[Test] @@ -174,7 +174,7 @@ public function it_can_resume_queues(): void CloudTasksApi::resume($queueName); // Assert - $this->assertEquals(State::RUNNING, $this->getQueueState($queueName)); + $this->assertEquals(State::RUNNING, $this->waitForQueueState($queueName, State::RUNNING)); } private function getQueueState(string $queue): int @@ -182,6 +182,30 @@ private function getQueueState(string $queue): int return $this->client->getQueue(GetQueueRequest::build($queue))->getState(); } + private function waitForQueueState(string $queue, int $waitForState): ?int + { + $state = null; + $attempts = 0; + + while ($state !== $waitForState) { + $state = $this->getQueueState($queue); + + if ($state === $waitForState) { + return $state; + } + + $attempts++; + + if ($attempts >= 10) { + break; + } + + sleep(1); + } + + return $state; + } + private function ensureQueueIs(string $queue, int $desiredState): void { $currentState = $this->getQueueState($queue); From c90bfede08e0060af57b1354080b8404ce864ff8 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Fri, 27 Feb 2026 16:48:22 +0100 Subject: [PATCH 5/8] Fix --- tests/CloudTasksApiTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/CloudTasksApiTest.php b/tests/CloudTasksApiTest.php index 78c50bd..24a42bb 100644 --- a/tests/CloudTasksApiTest.php +++ b/tests/CloudTasksApiTest.php @@ -222,6 +222,6 @@ private function ensureQueueIs(string $queue, int $desiredState): void CloudTasksApi::resume($queue); } - $this->assertEquals($desiredState, $this->getQueueState($queue)); + $this->assertEquals($desiredState, $this->waitForQueueState($queue, $desiredState)); } } From 841295048858e166f27e38c1c507e91dfefc5bc4 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Sat, 28 Feb 2026 11:43:38 +0100 Subject: [PATCH 6/8] Remove comment --- tests/PauseResumeQueueTest.php | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/PauseResumeQueueTest.php b/tests/PauseResumeQueueTest.php index 52315fa..1c321fe 100644 --- a/tests/PauseResumeQueueTest.php +++ b/tests/PauseResumeQueueTest.php @@ -20,7 +20,6 @@ public function queue_can_be_paused(): void CloudTasksApi::fake(); - // $this->artisan('queue:pause cloudtasks:barbequeue'); Artisan::call('queue:pause my-cloudtasks-connection:barbequeue'); // Assert From 3222ed3fe385d0437c01cbb0b4c926d57bdfd066 Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Wed, 4 Mar 2026 22:15:12 +0100 Subject: [PATCH 7/8] Fix bug --- src/CloudTasksQueue.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/CloudTasksQueue.php b/src/CloudTasksQueue.php index 11e1078..1a7ed97 100644 --- a/src/CloudTasksQueue.php +++ b/src/CloudTasksQueue.php @@ -472,13 +472,13 @@ public function pause(string $queue): void { $queueName = CloudTasksClient::queueName($this->config['project'], $this->config['location'], $queue); - CloudTasksApi::pause($queue); + CloudTasksApi::pause($queueName); } public function resume(string $queue): void { $queueName = CloudTasksClient::queueName($this->config['project'], $this->config['location'], $queue); - CloudTasksApi::resume($queue); + CloudTasksApi::resume($queueName); } } From 3d977635dca01db26ec73491e9e2e3db3955c62d Mon Sep 17 00:00:00 2001 From: Marick van Tuil Date: Wed, 4 Mar 2026 22:15:30 +0100 Subject: [PATCH 8/8] Refactor --- phpstan.neon | 4 +++- src/CloudTasksApiFake.php | 4 ++-- src/CloudTasksServiceProvider.php | 34 ++++++++++++++----------------- tests/PauseResumeQueueTest.php | 28 +++++++++++-------------- 4 files changed, 32 insertions(+), 38 deletions(-) diff --git a/phpstan.neon b/phpstan.neon index 579f511..1211568 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -6,4 +6,6 @@ parameters: - src level: 9 ignoreErrors: - - "/dispatchAfterCommit with no type specified/" \ No newline at end of file + - "/dispatchAfterCommit with no type specified/" + - message: "#Illuminate\\\\Queue\\\\Events\\\\Queue(Paused|Resumed)#" + identifier: class.notFound \ No newline at end of file diff --git a/src/CloudTasksApiFake.php b/src/CloudTasksApiFake.php index 05b5cea..1e99969 100644 --- a/src/CloudTasksApiFake.php +++ b/src/CloudTasksApiFake.php @@ -103,11 +103,11 @@ public function assertCreatedTaskCount(int $count): void public function assertQueuePaused(string $queue): void { - Assert::assertTrue($this->pausedQueues[$queue] ?? null, 'Expected queue ['.$queue.'] to be paused, but is not'); + Assert::assertArrayHasKey($queue, $this->pausedQueues, 'Expected queue ['.$queue.'] to be paused, but it is not.'); } public function assertQueueNotPaused(string $queue): void { - Assert::assertNotTrue($this->pausedQueues[$queue] ?? null, 'Expected queue ['.$queue.'] to not be paused, but it is'); + Assert::assertArrayNotHasKey($queue, $this->pausedQueues, 'Expected queue ['.$queue.'] to not be paused, but it is.'); } } diff --git a/src/CloudTasksServiceProvider.php b/src/CloudTasksServiceProvider.php index 01e79d0..6899543 100644 --- a/src/CloudTasksServiceProvider.php +++ b/src/CloudTasksServiceProvider.php @@ -117,29 +117,25 @@ private function registerEvents(): void } }); - if (class_exists('Illuminate\Queue\Events\QueuePaused')) { - $events->listen(QueuePaused::class, function (QueuePaused $event) { // @phpstan-ignore-line - $queue = Queue::connection($event->connection); // @phpstan-ignore-line - - if (! $queue instanceof CloudTasksQueue) { - return; - } - - $queue->pause($event->queue); // @phpstan-ignore-line - }); + if (! class_exists(QueuePaused::class)) { + return; } - if (class_exists('Illuminate\Queue\Events\QueueResumed')) { - $events->listen(QueueResumed::class, function (QueueResumed $event) { // @phpstan-ignore-line - $queue = Queue::connection($event->connection); // @phpstan-ignore-line + $events->listen(QueuePaused::class, function (QueuePaused $event) { + $queue = Queue::connection($event->connection); - if (! $queue instanceof CloudTasksQueue) { - return; - } + if ($queue instanceof CloudTasksQueue) { + $queue->pause($event->queue); + } + }); - $queue->resume($event->queue); // @phpstan-ignore-line - }); - } + $events->listen(QueueResumed::class, function (QueueResumed $event) { + $queue = Queue::connection($event->connection); + + if ($queue instanceof CloudTasksQueue) { + $queue->resume($event->queue); + } + }); } private function registerCommands(): void diff --git a/tests/PauseResumeQueueTest.php b/tests/PauseResumeQueueTest.php index 1c321fe..9c243d5 100644 --- a/tests/PauseResumeQueueTest.php +++ b/tests/PauseResumeQueueTest.php @@ -10,36 +10,32 @@ class PauseResumeQueueTest extends TestCase { - #[Test] - public function queue_can_be_paused(): void + protected function setUp(): void { - // Arrange - if (app()->version() < 12) { + parent::setUp(); + + if (version_compare(app()->version(), '12.0.0', '<')) { $this->markTestSkipped('This feature only exists in Laravel 12 and up.'); } CloudTasksApi::fake(); + } + #[Test] + public function queue_can_be_paused(): void + { Artisan::call('queue:pause my-cloudtasks-connection:barbequeue'); - // Assert - CloudTasksApi::assertQueuePaused('barbequeue'); + CloudTasksApi::assertQueuePaused('projects/my-test-project/locations/europe-west6/queues/barbequeue'); } #[Test] public function queue_can_be_resumed(): void { - // Arrange - if (app()->version() < 12) { - $this->markTestSkipped('This feature only exists in Laravel 12 and up.'); - } - - CloudTasksApi::fake(); - Artisan::call('queue:pause my-cloudtasks-connection:barbequeue'); - Artisan::call('queue:continue my-cloudtasks-connection:barbequeue'); + CloudTasksApi::assertQueuePaused('projects/my-test-project/locations/europe-west6/queues/barbequeue'); - // Assert - CloudTasksApi::assertQueueNotPaused('barbequeue'); + Artisan::call('queue:continue my-cloudtasks-connection:barbequeue'); + CloudTasksApi::assertQueueNotPaused('projects/my-test-project/locations/europe-west6/queues/barbequeue'); } }