diff --git a/apps/sim/lib/core/config/redis.test.ts b/apps/sim/lib/core/config/redis.test.ts index 7c740e2ec4..c42dfae6ef 100644 --- a/apps/sim/lib/core/config/redis.test.ts +++ b/apps/sim/lib/core/config/redis.test.ts @@ -29,9 +29,8 @@ describe('redis config', () => { getRedisClient() mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT')) - await vi.advanceTimersByTimeAsync(30_000) - await vi.advanceTimersByTimeAsync(30_000) - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) + await vi.advanceTimersByTimeAsync(15_000) expect(listener).toHaveBeenCalledTimes(1) }) @@ -44,9 +43,9 @@ describe('redis config', () => { getRedisClient() mockRedisInstance.ping.mockResolvedValue('PONG') - await vi.advanceTimersByTimeAsync(30_000) - await vi.advanceTimersByTimeAsync(30_000) - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) + await vi.advanceTimersByTimeAsync(15_000) + await vi.advanceTimersByTimeAsync(15_000) expect(listener).not.toHaveBeenCalled() }) @@ -58,34 +57,29 @@ describe('redis config', () => { getRedisClient() - // 2 failures then a success — should reset counter + // 1 failure then a success — should reset counter mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout')) - await vi.advanceTimersByTimeAsync(30_000) - mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout')) - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) mockRedisInstance.ping.mockResolvedValueOnce('PONG') - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) - // 2 more failures — should NOT trigger reconnect (counter was reset) - mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout')) - await vi.advanceTimersByTimeAsync(30_000) + // 1 more failure — should NOT trigger reconnect (counter was reset) mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout')) - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) expect(listener).not.toHaveBeenCalled() }) - it('should call disconnect(true) after 3 consecutive PING failures', async () => { + it('should call disconnect(true) after 2 consecutive PING failures', async () => { const { getRedisClient } = await import('./redis') getRedisClient() mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT')) - await vi.advanceTimersByTimeAsync(30_000) - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) expect(mockRedisInstance.disconnect).not.toHaveBeenCalled() - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) expect(mockRedisInstance.disconnect).toHaveBeenCalledWith(true) }) @@ -100,9 +94,8 @@ describe('redis config', () => { getRedisClient() mockRedisInstance.ping.mockRejectedValue(new Error('timeout')) - await vi.advanceTimersByTimeAsync(30_000) - await vi.advanceTimersByTimeAsync(30_000) - await vi.advanceTimersByTimeAsync(30_000) + await vi.advanceTimersByTimeAsync(15_000) + await vi.advanceTimersByTimeAsync(15_000) expect(badListener).toHaveBeenCalledTimes(1) expect(goodListener).toHaveBeenCalledTimes(1) @@ -119,7 +112,7 @@ describe('redis config', () => { // After closing, PING failures should not trigger disconnect mockRedisInstance.ping.mockRejectedValue(new Error('timeout')) - await vi.advanceTimersByTimeAsync(30_000 * 5) + await vi.advanceTimersByTimeAsync(15_000 * 5) expect(mockRedisInstance.disconnect).not.toHaveBeenCalled() }) }) diff --git a/apps/sim/lib/core/config/redis.ts b/apps/sim/lib/core/config/redis.ts index 4db71b49b2..74e26da860 100644 --- a/apps/sim/lib/core/config/redis.ts +++ b/apps/sim/lib/core/config/redis.ts @@ -11,8 +11,8 @@ let pingFailures = 0 let pingInterval: NodeJS.Timeout | null = null let pingInFlight = false -const PING_INTERVAL_MS = 30_000 -const MAX_PING_FAILURES = 3 +const PING_INTERVAL_MS = 15_000 +const MAX_PING_FAILURES = 2 /** Callbacks invoked when the PING health check forces a reconnect. */ const reconnectListeners: Array<() => void> = [] @@ -42,7 +42,7 @@ function startPingHealthCheck(redis: Redis): void { }) if (pingFailures >= MAX_PING_FAILURES) { - logger.error('Redis PING failed 3 consecutive times — forcing reconnect', { + logger.error('Redis PING failed consecutive times — forcing reconnect', { consecutiveFailures: pingFailures, }) pingFailures = 0 diff --git a/apps/sim/lib/execution/isolated-vm.ts b/apps/sim/lib/execution/isolated-vm.ts index 7a733e0b42..3da81142f5 100644 --- a/apps/sim/lib/execution/isolated-vm.ts +++ b/apps/sim/lib/execution/isolated-vm.ts @@ -71,6 +71,7 @@ const DISTRIBUTED_MAX_INFLIGHT_PER_OWNER = MAX_ACTIVE_PER_OWNER + MAX_QUEUED_PER_OWNER const DISTRIBUTED_LEASE_MIN_TTL_MS = Number.parseInt(env.IVM_DISTRIBUTED_LEASE_MIN_TTL_MS) || 120000 const DISTRIBUTED_KEY_PREFIX = 'ivm:fair:v1:owner' +const LEASE_REDIS_DEADLINE_MS = 200 const QUEUE_RETRY_DELAY_MS = 1000 const DISTRIBUTED_LEASE_GRACE_MS = 30000 @@ -292,21 +293,37 @@ async function tryAcquireDistributedLease( return 1 ` - try { - const result = await redis.eval( - script, - 1, - key, - now.toString(), - DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(), - expiresAt.toString(), - leaseId, - leaseTtlMs.toString() + let deadlineTimer: NodeJS.Timeout | undefined + const deadline = new Promise((_, reject) => { + deadlineTimer = setTimeout( + () => reject(new Error(`Redis lease timed out after ${LEASE_REDIS_DEADLINE_MS}ms`)), + LEASE_REDIS_DEADLINE_MS ) + }) + + try { + const result = await Promise.race([ + redis.eval( + script, + 1, + key, + now.toString(), + DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(), + expiresAt.toString(), + leaseId, + leaseTtlMs.toString() + ), + deadline, + ]) return Number(result) === 1 ? 'acquired' : 'limit_exceeded' } catch (error) { - logger.error('Failed to acquire distributed owner lease', { ownerKey, error }) + logger.warn('Failed to acquire distributed owner lease — falling back to local execution', { + ownerKey, + error, + }) return 'unavailable' + } finally { + clearTimeout(deadlineTimer) } }