From 0dfc2b594f145fe318b408921147dd3c46616f61 Mon Sep 17 00:00:00 2001 From: waleed Date: Mon, 23 Feb 2026 13:00:39 -0800 Subject: [PATCH 1/4] fix(redis): tighten stale TCP connection detection and add fast lease deadline --- apps/sim/lib/core/config/redis.test.ts | 39 +++++++++++--------------- apps/sim/lib/core/config/redis.ts | 14 ++++++--- apps/sim/lib/execution/isolated-vm.ts | 31 +++++++++++++------- 3 files changed, 47 insertions(+), 37 deletions(-) diff --git a/apps/sim/lib/core/config/redis.test.ts b/apps/sim/lib/core/config/redis.test.ts index 7c740e2ec4..c42dfae6ef 100644 --- a/apps/sim/lib/core/config/redis.test.ts +++ b/apps/sim/lib/core/config/redis.test.ts @@ -29,9 +29,8 @@ describe('redis config', () => { getRedisClient() mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT')) - await vi.advanceTimersByTimeAsync(30_000) - await vi.advanceTimersByTimeAsync(30_000) - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) + await vi.advanceTimersByTimeAsync(15_000) expect(listener).toHaveBeenCalledTimes(1) }) @@ -44,9 +43,9 @@ describe('redis config', () => { getRedisClient() mockRedisInstance.ping.mockResolvedValue('PONG') - await vi.advanceTimersByTimeAsync(30_000) - await vi.advanceTimersByTimeAsync(30_000) - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) + await vi.advanceTimersByTimeAsync(15_000) + await vi.advanceTimersByTimeAsync(15_000) expect(listener).not.toHaveBeenCalled() }) @@ -58,34 +57,29 @@ describe('redis config', () => { getRedisClient() - // 2 failures then a success — should reset counter + // 1 failure then a success — should reset counter mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout')) - await vi.advanceTimersByTimeAsync(30_000) - mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout')) - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) mockRedisInstance.ping.mockResolvedValueOnce('PONG') - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) - // 2 more failures — should NOT trigger reconnect (counter was reset) - mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout')) - await vi.advanceTimersByTimeAsync(30_000) + // 1 more failure — should NOT trigger reconnect (counter was reset) mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout')) - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) expect(listener).not.toHaveBeenCalled() }) - it('should call disconnect(true) after 3 consecutive PING failures', async () => { + it('should call disconnect(true) after 2 consecutive PING failures', async () => { const { getRedisClient } = await import('./redis') getRedisClient() mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT')) - await vi.advanceTimersByTimeAsync(30_000) - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) expect(mockRedisInstance.disconnect).not.toHaveBeenCalled() - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) expect(mockRedisInstance.disconnect).toHaveBeenCalledWith(true) }) @@ -100,9 +94,8 @@ describe('redis config', () => { getRedisClient() mockRedisInstance.ping.mockRejectedValue(new Error('timeout')) - await vi.advanceTimersByTimeAsync(30_000) - await vi.advanceTimersByTimeAsync(30_000) - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) + await vi.advanceTimersByTimeAsync(15_000) expect(badListener).toHaveBeenCalledTimes(1) expect(goodListener).toHaveBeenCalledTimes(1) @@ -119,7 +112,7 @@ describe('redis config', () => { // After closing, PING failures should not trigger disconnect mockRedisInstance.ping.mockRejectedValue(new Error('timeout')) - await vi.advanceTimersByTimeAsync(30_000 * 5) + await vi.advanceTimersByTimeAsync(15_000 * 5) expect(mockRedisInstance.disconnect).not.toHaveBeenCalled() }) }) diff --git a/apps/sim/lib/core/config/redis.ts b/apps/sim/lib/core/config/redis.ts index 4db71b49b2..98fe7b6853 100644 --- a/apps/sim/lib/core/config/redis.ts +++ b/apps/sim/lib/core/config/redis.ts @@ -11,8 +11,8 @@ let pingFailures = 0 let pingInterval: NodeJS.Timeout | null = null let pingInFlight = false -const PING_INTERVAL_MS = 30_000 -const MAX_PING_FAILURES = 3 +const PING_INTERVAL_MS = 15_000 +const MAX_PING_FAILURES = 2 /** Callbacks invoked when the PING health check forces a reconnect. */ const reconnectListeners: Array<() => void> = [] @@ -42,7 +42,7 @@ function startPingHealthCheck(redis: Redis): void { }) if (pingFailures >= MAX_PING_FAILURES) { - logger.error('Redis PING failed 3 consecutive times — forcing reconnect', { + logger.error('Redis PING failed consecutive times — forcing reconnect', { consecutiveFailures: pingFailures, }) pingFailures = 0 @@ -89,7 +89,13 @@ export function getRedisClient(): Redis | null { retryStrategy: (times) => { if (times > 10) { - logger.error(`Redis reconnection attempt ${times}`, { nextRetryMs: 30000 }) + // Log at the transition point and every 20 attempts (~10 min) to avoid unbounded log volume + if (times === 11 || times % 20 === 0) { + logger.error('Redis reconnection stalled — retrying every 30s', { + attempt: times, + nextRetryMs: 30000, + }) + } return 30000 } const base = Math.min(1000 * 2 ** (times - 1), 10000) diff --git a/apps/sim/lib/execution/isolated-vm.ts b/apps/sim/lib/execution/isolated-vm.ts index 7a733e0b42..ab088b257f 100644 --- a/apps/sim/lib/execution/isolated-vm.ts +++ b/apps/sim/lib/execution/isolated-vm.ts @@ -71,6 +71,7 @@ const DISTRIBUTED_MAX_INFLIGHT_PER_OWNER = MAX_ACTIVE_PER_OWNER + MAX_QUEUED_PER_OWNER const DISTRIBUTED_LEASE_MIN_TTL_MS = Number.parseInt(env.IVM_DISTRIBUTED_LEASE_MIN_TTL_MS) || 120000 const DISTRIBUTED_KEY_PREFIX = 'ivm:fair:v1:owner' +const LEASE_REDIS_DEADLINE_MS = 200 const QUEUE_RETRY_DELAY_MS = 1000 const DISTRIBUTED_LEASE_GRACE_MS = 30000 @@ -292,17 +293,27 @@ async function tryAcquireDistributedLease( return 1 ` - try { - const result = await redis.eval( - script, - 1, - key, - now.toString(), - DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(), - expiresAt.toString(), - leaseId, - leaseTtlMs.toString() + const deadline = new Promise((_, reject) => + setTimeout( + () => reject(new Error(`Redis lease timed out after ${LEASE_REDIS_DEADLINE_MS}ms`)), + LEASE_REDIS_DEADLINE_MS ) + ) + + try { + const result = await Promise.race([ + redis.eval( + script, + 1, + key, + now.toString(), + DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(), + expiresAt.toString(), + leaseId, + leaseTtlMs.toString() + ), + deadline, + ]) return Number(result) === 1 ? 'acquired' : 'limit_exceeded' } catch (error) { logger.error('Failed to acquire distributed owner lease', { ownerKey, error }) From 3d55db1bf9144f7e97f3277d4e313b9246ce86b1 Mon Sep 17 00:00:00 2001 From: waleed Date: Mon, 23 Feb 2026 13:03:05 -0800 Subject: [PATCH 2/4] revert(redis): restore original retryStrategy logging --- apps/sim/lib/core/config/redis.ts | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/apps/sim/lib/core/config/redis.ts b/apps/sim/lib/core/config/redis.ts index 98fe7b6853..74e26da860 100644 --- a/apps/sim/lib/core/config/redis.ts +++ b/apps/sim/lib/core/config/redis.ts @@ -89,13 +89,7 @@ export function getRedisClient(): Redis | null { retryStrategy: (times) => { if (times > 10) { - // Log at the transition point and every 20 attempts (~10 min) to avoid unbounded log volume - if (times === 11 || times % 20 === 0) { - logger.error('Redis reconnection stalled — retrying every 30s', { - attempt: times, - nextRetryMs: 30000, - }) - } + logger.error(`Redis reconnection attempt ${times}`, { nextRetryMs: 30000 }) return 30000 } const base = Math.min(1000 * 2 ** (times - 1), 10000) From 5003c1d4246e235d5a24e09705d2747881b81245 Mon Sep 17 00:00:00 2001 From: waleed Date: Mon, 23 Feb 2026 13:04:52 -0800 Subject: [PATCH 3/4] fix(redis): clear deadline timer after Promise.race to prevent memory leak --- apps/sim/lib/execution/isolated-vm.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/apps/sim/lib/execution/isolated-vm.ts b/apps/sim/lib/execution/isolated-vm.ts index ab088b257f..54a3526e65 100644 --- a/apps/sim/lib/execution/isolated-vm.ts +++ b/apps/sim/lib/execution/isolated-vm.ts @@ -293,12 +293,13 @@ async function tryAcquireDistributedLease( return 1 ` - const deadline = new Promise((_, reject) => - setTimeout( + let deadlineTimer: NodeJS.Timeout | undefined + const deadline = new Promise((_, reject) => { + deadlineTimer = setTimeout( () => reject(new Error(`Redis lease timed out after ${LEASE_REDIS_DEADLINE_MS}ms`)), LEASE_REDIS_DEADLINE_MS ) - ) + }) try { const result = await Promise.race([ @@ -318,6 +319,8 @@ async function tryAcquireDistributedLease( } catch (error) { logger.error('Failed to acquire distributed owner lease', { ownerKey, error }) return 'unavailable' + } finally { + clearTimeout(deadlineTimer) } } From 5ce2523325f92ce03f3ac58c4ac5c1a8cf17fcd4 Mon Sep 17 00:00:00 2001 From: waleed Date: Mon, 23 Feb 2026 13:14:29 -0800 Subject: [PATCH 4/4] =?UTF-8?q?fix(redis):=20downgrade=20lease=20fallback?= =?UTF-8?q?=20log=20to=20warn=20=E2=80=94=20unavailable=20is=20expected=20?= =?UTF-8?q?fallback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/sim/lib/execution/isolated-vm.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/sim/lib/execution/isolated-vm.ts b/apps/sim/lib/execution/isolated-vm.ts index 54a3526e65..3da81142f5 100644 --- a/apps/sim/lib/execution/isolated-vm.ts +++ b/apps/sim/lib/execution/isolated-vm.ts @@ -317,7 +317,10 @@ async function tryAcquireDistributedLease( ]) return Number(result) === 1 ? 'acquired' : 'limit_exceeded' } catch (error) { - logger.error('Failed to acquire distributed owner lease', { ownerKey, error }) + logger.warn('Failed to acquire distributed owner lease — falling back to local execution', { + ownerKey, + error, + }) return 'unavailable' } finally { clearTimeout(deadlineTimer)