Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 16 additions & 23 deletions apps/sim/lib/core/config/redis.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ describe('redis config', () => {
getRedisClient()

mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT'))
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)
await vi.advanceTimersByTimeAsync(15_000)

expect(listener).toHaveBeenCalledTimes(1)
})
Expand All @@ -44,9 +43,9 @@ describe('redis config', () => {
getRedisClient()
mockRedisInstance.ping.mockResolvedValue('PONG')

await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)
await vi.advanceTimersByTimeAsync(15_000)
await vi.advanceTimersByTimeAsync(15_000)

expect(listener).not.toHaveBeenCalled()
})
Expand All @@ -58,34 +57,29 @@ describe('redis config', () => {

getRedisClient()

// 2 failures then a success — should reset counter
// 1 failure then a success — should reset counter
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)
mockRedisInstance.ping.mockResolvedValueOnce('PONG')
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)

// 2 more failures — should NOT trigger reconnect (counter was reset)
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)
// 1 more failure — should NOT trigger reconnect (counter was reset)
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)

expect(listener).not.toHaveBeenCalled()
})

it('should call disconnect(true) after 3 consecutive PING failures', async () => {
it('should call disconnect(true) after 2 consecutive PING failures', async () => {
const { getRedisClient } = await import('./redis')
getRedisClient()

mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT'))
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)

expect(mockRedisInstance.disconnect).not.toHaveBeenCalled()

await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)
expect(mockRedisInstance.disconnect).toHaveBeenCalledWith(true)
})

Expand All @@ -100,9 +94,8 @@ describe('redis config', () => {

getRedisClient()
mockRedisInstance.ping.mockRejectedValue(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)
await vi.advanceTimersByTimeAsync(15_000)

expect(badListener).toHaveBeenCalledTimes(1)
expect(goodListener).toHaveBeenCalledTimes(1)
Expand All @@ -119,7 +112,7 @@ describe('redis config', () => {

// After closing, PING failures should not trigger disconnect
mockRedisInstance.ping.mockRejectedValue(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000 * 5)
await vi.advanceTimersByTimeAsync(15_000 * 5)
expect(mockRedisInstance.disconnect).not.toHaveBeenCalled()
})
})
Expand Down
6 changes: 3 additions & 3 deletions apps/sim/lib/core/config/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ let pingFailures = 0
let pingInterval: NodeJS.Timeout | null = null
let pingInFlight = false

const PING_INTERVAL_MS = 30_000
const MAX_PING_FAILURES = 3
const PING_INTERVAL_MS = 15_000
const MAX_PING_FAILURES = 2

/** Callbacks invoked when the PING health check forces a reconnect. */
const reconnectListeners: Array<() => void> = []
Expand Down Expand Up @@ -42,7 +42,7 @@ function startPingHealthCheck(redis: Redis): void {
})

if (pingFailures >= MAX_PING_FAILURES) {
logger.error('Redis PING failed 3 consecutive times — forcing reconnect', {
logger.error('Redis PING failed consecutive times — forcing reconnect', {
consecutiveFailures: pingFailures,
})
pingFailures = 0
Expand Down
39 changes: 28 additions & 11 deletions apps/sim/lib/execution/isolated-vm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const DISTRIBUTED_MAX_INFLIGHT_PER_OWNER =
MAX_ACTIVE_PER_OWNER + MAX_QUEUED_PER_OWNER
const DISTRIBUTED_LEASE_MIN_TTL_MS = Number.parseInt(env.IVM_DISTRIBUTED_LEASE_MIN_TTL_MS) || 120000
const DISTRIBUTED_KEY_PREFIX = 'ivm:fair:v1:owner'
const LEASE_REDIS_DEADLINE_MS = 200
const QUEUE_RETRY_DELAY_MS = 1000
const DISTRIBUTED_LEASE_GRACE_MS = 30000

Expand Down Expand Up @@ -292,21 +293,37 @@ async function tryAcquireDistributedLease(
return 1
`

try {
const result = await redis.eval(
script,
1,
key,
now.toString(),
DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(),
expiresAt.toString(),
leaseId,
leaseTtlMs.toString()
let deadlineTimer: NodeJS.Timeout | undefined
const deadline = new Promise<never>((_, reject) => {
deadlineTimer = setTimeout(
() => reject(new Error(`Redis lease timed out after ${LEASE_REDIS_DEADLINE_MS}ms`)),
LEASE_REDIS_DEADLINE_MS
)
})

try {
const result = await Promise.race([
redis.eval(
script,
1,
key,
now.toString(),
DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(),
expiresAt.toString(),
leaseId,
leaseTtlMs.toString()
),
deadline,
])
return Number(result) === 1 ? 'acquired' : 'limit_exceeded'
} catch (error) {
logger.error('Failed to acquire distributed owner lease', { ownerKey, error })
logger.warn('Failed to acquire distributed owner lease — falling back to local execution', {
ownerKey,
error,
})
return 'unavailable'
} finally {
clearTimeout(deadlineTimer)
}
}

Expand Down