From e6be354e9d723201b8870267e14fecd85923d75d Mon Sep 17 00:00:00 2001 From: Damodar Lohani Date: Sun, 22 Feb 2026 09:38:31 +0000 Subject: [PATCH 1/4] feat: add configurable connect and read timeouts to Redis connections Queue Redis connections previously used PHP's default timeout of 0 (blocking indefinitely). When Redis becomes unresponsive due to contention (e.g. Dragonfly TxQueue saturation), worker BRPOP calls would hang until the OS TCP timeout (21-127s), causing fatal "read error on connection" crashes and cascading restarts. Adds `connectTimeout` and `readTimeout` parameters (default 5s) to both `Redis` and `RedisCluster` connection classes so callers can configure fail-fast behaviour. Co-Authored-By: Claude Sonnet 4.6 --- src/Queue/Connection/Redis.php | 9 +++++++-- src/Queue/Connection/RedisCluster.php | 8 ++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index a0729b3..5aca4cb 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -10,14 +10,18 @@ class Redis implements Connection protected int $port; protected ?string $user; protected ?string $password; + protected float $connectTimeout; + protected float $readTimeout; protected ?\Redis $redis = null; - public function __construct(string $host, int $port = 6379, ?string $user = null, ?string $password = null) + public function __construct(string $host, int $port = 6379, ?string $user = null, ?string $password = null, float $connectTimeout = 5, float $readTimeout = 5) { $this->host = $host; $this->port = $port; $this->user = $user; $this->password = $password; + $this->connectTimeout = $connectTimeout; + $this->readTimeout = $readTimeout; } public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false @@ -186,7 +190,8 @@ protected function getRedis(): \Redis $this->redis = new \Redis(); - $this->redis->connect($this->host, $this->port); + $this->redis->connect($this->host, $this->port, $this->connectTimeout); + $this->redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->readTimeout); return $this->redis; } diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index ec9ec68..1d36fca 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -7,11 +7,15 @@ class RedisCluster implements Connection { protected array $seeds; + protected float $connectTimeout; + protected float $readTimeout; protected ?\RedisCluster $redis = null; - public function __construct(array $seeds) + public function __construct(array $seeds, float $connectTimeout = 5, float $readTimeout = 5) { $this->seeds = $seeds; + $this->connectTimeout = $connectTimeout; + $this->readTimeout = $readTimeout; } public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false @@ -181,7 +185,7 @@ protected function getRedis(): \RedisCluster return $this->redis; } - $this->redis = new \RedisCluster(null, $this->seeds); + $this->redis = new \RedisCluster(null, $this->seeds, $this->connectTimeout, $this->readTimeout); return $this->redis; } } From 75a4ffdae802162655680a635b691e8ded639edc Mon Sep 17 00:00:00 2001 From: Damodar Lohani Date: Sun, 22 Feb 2026 11:51:55 +0000 Subject: [PATCH 2/4] fix: prevent socket timeout from aborting blocking Redis commands A fixed readTimeout on the connection caused brPop/blPop/bRPopLPush to throw RedisException whenever the command's block duration exceeded the socket read timeout, regardless of whether Redis responded correctly. Changes: - Default readTimeout to -1 (infinite) to preserve existing behavior; callers can opt in to a stricter timeout when needed (e.g. publishers) - Before each blocking call, temporarily widen OPT_READ_TIMEOUT to max($timeout + 1, $this->readTimeout) so the socket never fires before Redis returns - Restore the previous OPT_READ_TIMEOUT in a finally block - On RedisException in a blocking call, null $this->redis so getRedis() reconnects on the next call instead of reusing a broken connection - Deduplicate leftPopArray/rightPopArray to delegate to leftPop/rightPop Applies to both Redis and RedisCluster connection classes. Co-Authored-By: Claude Sonnet 4.6 --- src/Queue/Connection/Redis.php | 70 ++++++++++++++++++++++++--- src/Queue/Connection/RedisCluster.php | 70 ++++++++++++++++++++++++--- 2 files changed, 126 insertions(+), 14 deletions(-) diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index 5aca4cb..cbcc317 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -14,7 +14,7 @@ class Redis implements Connection protected float $readTimeout; protected ?\Redis $redis = null; - public function __construct(string $host, int $port = 6379, ?string $user = null, ?string $password = null, float $connectTimeout = 5, float $readTimeout = 5) + public function __construct(string $host, int $port = 6379, ?string $user = null, ?string $password = null, float $connectTimeout = 5, float $readTimeout = -1) { $this->host = $host; $this->port = $port; @@ -34,9 +34,22 @@ public function rightPopLeftPushArray(string $queue, string $destination, int $t return json_decode($response, true); } + public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false { - $response = $this->getRedis()->bRPopLPush($queue, $destination, $timeout); + $redis = $this->getRedis(); + $prev = $redis->getOption(\Redis::OPT_READ_TIMEOUT); + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + try { + $response = $redis->bRPopLPush($queue, $destination, $timeout); + } catch (\RedisException $e) { + $this->redis = null; + throw $e; + } finally { + if ($this->redis) { + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $prev); + } + } if (!$response) { return false; @@ -44,6 +57,7 @@ public function rightPopLeftPush(string $queue, string $destination, int $timeou return $response; } + public function rightPushArray(string $queue, array $value): bool { return !!$this->getRedis()->rPush($queue, json_encode($value)); @@ -77,7 +91,19 @@ public function rightPopArray(string $queue, int $timeout): array|false public function rightPop(string $queue, int $timeout): string|false { - $response = $this->getRedis()->brPop([$queue], $timeout); + $redis = $this->getRedis(); + $prev = $redis->getOption(\Redis::OPT_READ_TIMEOUT); + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + try { + $response = $redis->brPop([$queue], $timeout); + } catch (\RedisException $e) { + $this->redis = null; + throw $e; + } finally { + if ($this->redis) { + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $prev); + } + } if (empty($response)) { return false; @@ -88,18 +114,30 @@ public function rightPop(string $queue, int $timeout): string|false public function leftPopArray(string $queue, int $timeout): array|false { - $response = $this->getRedis()->blPop($queue, $timeout); + $response = $this->leftPop($queue, $timeout); - if (empty($response)) { + if ($response === false) { return false; } - return json_decode($response[1], true) ?? false; + return json_decode($response, true) ?? false; } public function leftPop(string $queue, int $timeout): string|false { - $response = $this->getRedis()->blPop($queue, $timeout); + $redis = $this->getRedis(); + $prev = $redis->getOption(\Redis::OPT_READ_TIMEOUT); + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + try { + $response = $redis->blPop($queue, $timeout); + } catch (\RedisException $e) { + $this->redis = null; + throw $e; + } finally { + if ($this->redis) { + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $prev); + } + } if (empty($response)) { return false; @@ -195,4 +233,22 @@ protected function getRedis(): \Redis return $this->redis; } + + /** + * Returns the read timeout to use for a blocking command. + * Ensures the socket does not time out before Redis returns. + * A $timeout of 0 means block indefinitely, so we use -1 (infinite). + */ + private function blockingReadTimeout(int $timeout): float + { + if ($timeout <= 0) { + return -1; + } + // Add 1s buffer so the socket outlasts the Redis-side block timeout. + // Also respect an explicit readTimeout if it is already larger. + if ($this->readTimeout < 0) { + return -1; + } + return max((float)($timeout + 1), $this->readTimeout); + } } diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index 1d36fca..2e7fb3e 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -11,7 +11,7 @@ class RedisCluster implements Connection protected float $readTimeout; protected ?\RedisCluster $redis = null; - public function __construct(array $seeds, float $connectTimeout = 5, float $readTimeout = 5) + public function __construct(array $seeds, float $connectTimeout = 5, float $readTimeout = -1) { $this->seeds = $seeds; $this->connectTimeout = $connectTimeout; @@ -28,9 +28,22 @@ public function rightPopLeftPushArray(string $queue, string $destination, int $t return json_decode($response, true); } + public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false { - $response = $this->getRedis()->bRPopLPush($queue, $destination, $timeout); + $redis = $this->getRedis(); + $prev = $redis->getOption(\RedisCluster::OPT_READ_TIMEOUT); + $redis->setOption(\RedisCluster::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + try { + $response = $redis->bRPopLPush($queue, $destination, $timeout); + } catch (\RedisException $e) { + $this->redis = null; + throw $e; + } finally { + if ($this->redis) { + $redis->setOption(\RedisCluster::OPT_READ_TIMEOUT, $prev); + } + } if (!$response) { return false; @@ -38,6 +51,7 @@ public function rightPopLeftPush(string $queue, string $destination, int $timeou return $response; } + public function rightPushArray(string $queue, array $value): bool { return !!$this->getRedis()->rPush($queue, json_encode($value)); @@ -71,7 +85,19 @@ public function rightPopArray(string $queue, int $timeout): array|false public function rightPop(string $queue, int $timeout): string|false { - $response = $this->getRedis()->brPop([$queue], $timeout); + $redis = $this->getRedis(); + $prev = $redis->getOption(\RedisCluster::OPT_READ_TIMEOUT); + $redis->setOption(\RedisCluster::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + try { + $response = $redis->brPop([$queue], $timeout); + } catch (\RedisException $e) { + $this->redis = null; + throw $e; + } finally { + if ($this->redis) { + $redis->setOption(\RedisCluster::OPT_READ_TIMEOUT, $prev); + } + } if (empty($response)) { return false; @@ -82,18 +108,30 @@ public function rightPop(string $queue, int $timeout): string|false public function leftPopArray(string $queue, int $timeout): array|false { - $response = $this->getRedis()->blPop($queue, $timeout); + $response = $this->leftPop($queue, $timeout); - if (empty($response)) { + if ($response === false) { return false; } - return json_decode($response[1], true) ?? false; + return json_decode($response, true) ?? false; } public function leftPop(string $queue, int $timeout): string|false { - $response = $this->getRedis()->blPop($queue, $timeout); + $redis = $this->getRedis(); + $prev = $redis->getOption(\RedisCluster::OPT_READ_TIMEOUT); + $redis->setOption(\RedisCluster::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + try { + $response = $redis->blPop($queue, $timeout); + } catch (\RedisException $e) { + $this->redis = null; + throw $e; + } finally { + if ($this->redis) { + $redis->setOption(\RedisCluster::OPT_READ_TIMEOUT, $prev); + } + } if (empty($response)) { return false; @@ -188,4 +226,22 @@ protected function getRedis(): \RedisCluster $this->redis = new \RedisCluster(null, $this->seeds, $this->connectTimeout, $this->readTimeout); return $this->redis; } + + /** + * Returns the read timeout to use for a blocking command. + * Ensures the socket does not time out before Redis returns. + * A $timeout of 0 means block indefinitely, so we use -1 (infinite). + */ + private function blockingReadTimeout(int $timeout): float + { + if ($timeout <= 0) { + return -1; + } + // Add 1s buffer so the socket outlasts the Redis-side block timeout. + // Also respect an explicit readTimeout if it is already larger. + if ($this->readTimeout < 0) { + return -1; + } + return max((float)($timeout + 1), $this->readTimeout); + } } From 148ab69a697e386ac0772734b7b0f955207c79bb Mon Sep 17 00:00:00 2001 From: Damodar Lohani Date: Mon, 23 Feb 2026 01:06:14 +0000 Subject: [PATCH 3/4] feat: enhance constructor documentation for Redis and RedisCluster classes --- src/Queue/Connection/Redis.php | 13 +++++++++++++ src/Queue/Connection/RedisCluster.php | 10 ++++++++++ 2 files changed, 23 insertions(+) diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index cbcc317..ca0bfe4 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -14,6 +14,19 @@ class Redis implements Connection protected float $readTimeout; protected ?\Redis $redis = null; + /** + * @param string $host Redis host. + * @param int $port Redis port. + * @param string|null $user Redis ACL username (optional). + * @param string|null $password Redis password (optional). + * @param float $connectTimeout Connection timeout in seconds (0 = no timeout). + * @param float $readTimeout Socket read timeout in seconds (-1 = infinite). + * Use -1 for consumers so blocking commands (BRPOP/BLPOP) + * are not interrupted; the per-call blockingReadTimeout() + * helper adds a safety buffer automatically. + * Use a positive value (e.g. 5) for publishers so a hung + * Redis fails fast rather than blocking indefinitely. + */ public function __construct(string $host, int $port = 6379, ?string $user = null, ?string $password = null, float $connectTimeout = 5, float $readTimeout = -1) { $this->host = $host; diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index 2e7fb3e..d968abc 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -11,6 +11,16 @@ class RedisCluster implements Connection protected float $readTimeout; protected ?\RedisCluster $redis = null; + /** + * @param array $seeds Cluster seed nodes in "host:port" format. + * @param float $connectTimeout Connection timeout in seconds per node (0 = no timeout). + * @param float $readTimeout Socket read timeout in seconds (-1 = infinite). + * Use -1 for consumers so blocking commands (BRPOP/BLPOP) + * are not interrupted; the per-call blockingReadTimeout() + * helper adds a safety buffer automatically. + * Use a positive value (e.g. 5) for publishers so a hung + * node fails fast rather than blocking indefinitely. + */ public function __construct(array $seeds, float $connectTimeout = 5, float $readTimeout = -1) { $this->seeds = $seeds; From 91f7159c1b7fb6e16713912b16f85c5c2d5c44d5 Mon Sep 17 00:00:00 2001 From: Damodar Lohani Date: Mon, 23 Feb 2026 01:27:26 +0000 Subject: [PATCH 4/4] fix: replace hardcoded OPT_READ_TIMEOUT with class constant in RedisCluster methods --- src/Queue/Connection/RedisCluster.php | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index d968abc..8d9e8c8 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -6,6 +6,11 @@ class RedisCluster implements Connection { + // OPT_READ_TIMEOUT (3) is defined on \Redis but not on \RedisCluster in all + // environments (e.g. Swoole replaces RedisCluster with its own coroutine-aware + // class that omits the constant). The numeric value is stable across phpredis. + private const OPT_READ_TIMEOUT = 3; + protected array $seeds; protected float $connectTimeout; protected float $readTimeout; @@ -42,8 +47,8 @@ public function rightPopLeftPushArray(string $queue, string $destination, int $t public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false { $redis = $this->getRedis(); - $prev = $redis->getOption(\RedisCluster::OPT_READ_TIMEOUT); - $redis->setOption(\RedisCluster::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + $prev = $redis->getOption(self::OPT_READ_TIMEOUT); + $redis->setOption(self::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); try { $response = $redis->bRPopLPush($queue, $destination, $timeout); } catch (\RedisException $e) { @@ -51,7 +56,7 @@ public function rightPopLeftPush(string $queue, string $destination, int $timeou throw $e; } finally { if ($this->redis) { - $redis->setOption(\RedisCluster::OPT_READ_TIMEOUT, $prev); + $redis->setOption(self::OPT_READ_TIMEOUT, $prev); } } @@ -96,8 +101,8 @@ public function rightPopArray(string $queue, int $timeout): array|false public function rightPop(string $queue, int $timeout): string|false { $redis = $this->getRedis(); - $prev = $redis->getOption(\RedisCluster::OPT_READ_TIMEOUT); - $redis->setOption(\RedisCluster::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + $prev = $redis->getOption(self::OPT_READ_TIMEOUT); + $redis->setOption(self::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); try { $response = $redis->brPop([$queue], $timeout); } catch (\RedisException $e) { @@ -105,7 +110,7 @@ public function rightPop(string $queue, int $timeout): string|false throw $e; } finally { if ($this->redis) { - $redis->setOption(\RedisCluster::OPT_READ_TIMEOUT, $prev); + $redis->setOption(self::OPT_READ_TIMEOUT, $prev); } } @@ -130,8 +135,8 @@ public function leftPopArray(string $queue, int $timeout): array|false public function leftPop(string $queue, int $timeout): string|false { $redis = $this->getRedis(); - $prev = $redis->getOption(\RedisCluster::OPT_READ_TIMEOUT); - $redis->setOption(\RedisCluster::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + $prev = $redis->getOption(self::OPT_READ_TIMEOUT); + $redis->setOption(self::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); try { $response = $redis->blPop($queue, $timeout); } catch (\RedisException $e) { @@ -139,7 +144,7 @@ public function leftPop(string $queue, int $timeout): string|false throw $e; } finally { if ($this->redis) { - $redis->setOption(\RedisCluster::OPT_READ_TIMEOUT, $prev); + $redis->setOption(self::OPT_READ_TIMEOUT, $prev); } }