diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index a0729b3..ca0bfe4 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -10,14 +10,31 @@ class Redis implements Connection protected int $port; protected ?string $user; protected ?string $password; + protected float $connectTimeout; + protected float $readTimeout; protected ?\Redis $redis = null; - public function __construct(string $host, int $port = 6379, ?string $user = null, ?string $password = null) + /** + * @param string $host Redis host. + * @param int $port Redis port. + * @param string|null $user Redis ACL username (optional). + * @param string|null $password Redis password (optional). + * @param float $connectTimeout Connection timeout in seconds (0 = no timeout). + * @param float $readTimeout Socket read timeout in seconds (-1 = infinite). + * Use -1 for consumers so blocking commands (BRPOP/BLPOP) + * are not interrupted; the per-call blockingReadTimeout() + * helper adds a safety buffer automatically. + * Use a positive value (e.g. 5) for publishers so a hung + * Redis fails fast rather than blocking indefinitely. + */ + public function __construct(string $host, int $port = 6379, ?string $user = null, ?string $password = null, float $connectTimeout = 5, float $readTimeout = -1) { $this->host = $host; $this->port = $port; $this->user = $user; $this->password = $password; + $this->connectTimeout = $connectTimeout; + $this->readTimeout = $readTimeout; } public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false @@ -30,9 +47,22 @@ public function rightPopLeftPushArray(string $queue, string $destination, int $t return json_decode($response, true); } + public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false { - $response = $this->getRedis()->bRPopLPush($queue, $destination, $timeout); + $redis = $this->getRedis(); + $prev = $redis->getOption(\Redis::OPT_READ_TIMEOUT); + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + try { + $response = $redis->bRPopLPush($queue, $destination, $timeout); + } catch (\RedisException $e) { + $this->redis = null; + throw $e; + } finally { + if ($this->redis) { + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $prev); + } + } if (!$response) { return false; @@ -40,6 +70,7 @@ public function rightPopLeftPush(string $queue, string $destination, int $timeou return $response; } + public function rightPushArray(string $queue, array $value): bool { return !!$this->getRedis()->rPush($queue, json_encode($value)); @@ -73,7 +104,19 @@ public function rightPopArray(string $queue, int $timeout): array|false public function rightPop(string $queue, int $timeout): string|false { - $response = $this->getRedis()->brPop([$queue], $timeout); + $redis = $this->getRedis(); + $prev = $redis->getOption(\Redis::OPT_READ_TIMEOUT); + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + try { + $response = $redis->brPop([$queue], $timeout); + } catch (\RedisException $e) { + $this->redis = null; + throw $e; + } finally { + if ($this->redis) { + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $prev); + } + } if (empty($response)) { return false; @@ -84,18 +127,30 @@ public function rightPop(string $queue, int $timeout): string|false public function leftPopArray(string $queue, int $timeout): array|false { - $response = $this->getRedis()->blPop($queue, $timeout); + $response = $this->leftPop($queue, $timeout); - if (empty($response)) { + if ($response === false) { return false; } - return json_decode($response[1], true) ?? false; + return json_decode($response, true) ?? false; } public function leftPop(string $queue, int $timeout): string|false { - $response = $this->getRedis()->blPop($queue, $timeout); + $redis = $this->getRedis(); + $prev = $redis->getOption(\Redis::OPT_READ_TIMEOUT); + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + try { + $response = $redis->blPop($queue, $timeout); + } catch (\RedisException $e) { + $this->redis = null; + throw $e; + } finally { + if ($this->redis) { + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $prev); + } + } if (empty($response)) { return false; @@ -186,8 +241,27 @@ protected function getRedis(): \Redis $this->redis = new \Redis(); - $this->redis->connect($this->host, $this->port); + $this->redis->connect($this->host, $this->port, $this->connectTimeout); + $this->redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->readTimeout); return $this->redis; } + + /** + * Returns the read timeout to use for a blocking command. + * Ensures the socket does not time out before Redis returns. + * A $timeout of 0 means block indefinitely, so we use -1 (infinite). + */ + private function blockingReadTimeout(int $timeout): float + { + if ($timeout <= 0) { + return -1; + } + // Add 1s buffer so the socket outlasts the Redis-side block timeout. + // Also respect an explicit readTimeout if it is already larger. + if ($this->readTimeout < 0) { + return -1; + } + return max((float)($timeout + 1), $this->readTimeout); + } } diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index ec9ec68..8d9e8c8 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -6,12 +6,31 @@ class RedisCluster implements Connection { + // OPT_READ_TIMEOUT (3) is defined on \Redis but not on \RedisCluster in all + // environments (e.g. Swoole replaces RedisCluster with its own coroutine-aware + // class that omits the constant). The numeric value is stable across phpredis. + private const OPT_READ_TIMEOUT = 3; + protected array $seeds; + protected float $connectTimeout; + protected float $readTimeout; protected ?\RedisCluster $redis = null; - public function __construct(array $seeds) + /** + * @param array $seeds Cluster seed nodes in "host:port" format. + * @param float $connectTimeout Connection timeout in seconds per node (0 = no timeout). + * @param float $readTimeout Socket read timeout in seconds (-1 = infinite). + * Use -1 for consumers so blocking commands (BRPOP/BLPOP) + * are not interrupted; the per-call blockingReadTimeout() + * helper adds a safety buffer automatically. + * Use a positive value (e.g. 5) for publishers so a hung + * node fails fast rather than blocking indefinitely. + */ + public function __construct(array $seeds, float $connectTimeout = 5, float $readTimeout = -1) { $this->seeds = $seeds; + $this->connectTimeout = $connectTimeout; + $this->readTimeout = $readTimeout; } public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false @@ -24,9 +43,22 @@ public function rightPopLeftPushArray(string $queue, string $destination, int $t return json_decode($response, true); } + public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false { - $response = $this->getRedis()->bRPopLPush($queue, $destination, $timeout); + $redis = $this->getRedis(); + $prev = $redis->getOption(self::OPT_READ_TIMEOUT); + $redis->setOption(self::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + try { + $response = $redis->bRPopLPush($queue, $destination, $timeout); + } catch (\RedisException $e) { + $this->redis = null; + throw $e; + } finally { + if ($this->redis) { + $redis->setOption(self::OPT_READ_TIMEOUT, $prev); + } + } if (!$response) { return false; @@ -34,6 +66,7 @@ public function rightPopLeftPush(string $queue, string $destination, int $timeou return $response; } + public function rightPushArray(string $queue, array $value): bool { return !!$this->getRedis()->rPush($queue, json_encode($value)); @@ -67,7 +100,19 @@ public function rightPopArray(string $queue, int $timeout): array|false public function rightPop(string $queue, int $timeout): string|false { - $response = $this->getRedis()->brPop([$queue], $timeout); + $redis = $this->getRedis(); + $prev = $redis->getOption(self::OPT_READ_TIMEOUT); + $redis->setOption(self::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + try { + $response = $redis->brPop([$queue], $timeout); + } catch (\RedisException $e) { + $this->redis = null; + throw $e; + } finally { + if ($this->redis) { + $redis->setOption(self::OPT_READ_TIMEOUT, $prev); + } + } if (empty($response)) { return false; @@ -78,18 +123,30 @@ public function rightPop(string $queue, int $timeout): string|false public function leftPopArray(string $queue, int $timeout): array|false { - $response = $this->getRedis()->blPop($queue, $timeout); + $response = $this->leftPop($queue, $timeout); - if (empty($response)) { + if ($response === false) { return false; } - return json_decode($response[1], true) ?? false; + return json_decode($response, true) ?? false; } public function leftPop(string $queue, int $timeout): string|false { - $response = $this->getRedis()->blPop($queue, $timeout); + $redis = $this->getRedis(); + $prev = $redis->getOption(self::OPT_READ_TIMEOUT); + $redis->setOption(self::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout)); + try { + $response = $redis->blPop($queue, $timeout); + } catch (\RedisException $e) { + $this->redis = null; + throw $e; + } finally { + if ($this->redis) { + $redis->setOption(self::OPT_READ_TIMEOUT, $prev); + } + } if (empty($response)) { return false; @@ -181,7 +238,25 @@ protected function getRedis(): \RedisCluster return $this->redis; } - $this->redis = new \RedisCluster(null, $this->seeds); + $this->redis = new \RedisCluster(null, $this->seeds, $this->connectTimeout, $this->readTimeout); return $this->redis; } + + /** + * Returns the read timeout to use for a blocking command. + * Ensures the socket does not time out before Redis returns. + * A $timeout of 0 means block indefinitely, so we use -1 (infinite). + */ + private function blockingReadTimeout(int $timeout): float + { + if ($timeout <= 0) { + return -1; + } + // Add 1s buffer so the socket outlasts the Redis-side block timeout. + // Also respect an explicit readTimeout if it is already larger. + if ($this->readTimeout < 0) { + return -1; + } + return max((float)($timeout + 1), $this->readTimeout); + } }