From e23326cd1ccf2bd0f89fe5dcfc59b2ae6a02dba9 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 23 Feb 2026 16:33:55 +0000 Subject: [PATCH] fix(engine): allow disabling the ttl system consumers independently from the whole system --- apps/webapp/app/env.server.ts | 1 + apps/webapp/app/v3/runEngine.server.ts | 1 + internal-packages/run-engine/src/engine/index.ts | 3 ++- internal-packages/run-engine/src/engine/types.ts | 5 ++++- internal-packages/run-engine/src/run-queue/index.ts | 7 +++++++ 5 files changed, 15 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index d4ea1728b3..635819bde4 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -613,6 +613,7 @@ const EnvironmentSchema = z RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE: z.coerce.number().int().default(100), RUN_ENGINE_TTL_WORKER_CONCURRENCY: z.coerce.number().int().default(1), RUN_ENGINE_TTL_WORKER_BATCH_MAX_SIZE: z.coerce.number().int().default(50), + RUN_ENGINE_TTL_CONSUMERS_DISABLED: BoolEnv.default(false), RUN_ENGINE_TTL_WORKER_BATCH_MAX_WAIT_MS: z.coerce.number().int().default(5_000), /** Optional maximum TTL for all runs (e.g. "14d"). If set, runs without an explicit TTL diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index cf7cc4e5aa..037f7c6dce 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -82,6 +82,7 @@ function createRunEngine() { }, ttlSystem: { disabled: env.RUN_ENGINE_TTL_SYSTEM_DISABLED, + consumersDisabled: env.RUN_ENGINE_TTL_CONSUMERS_DISABLED, shardCount: env.RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT, pollIntervalMs: env.RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS, batchSize: env.RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 04c69aecf5..846252398e 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -195,6 +195,7 @@ export class RunEngine { shardCount: options.queue?.ttlSystem?.shardCount, pollIntervalMs: options.queue?.ttlSystem?.pollIntervalMs, batchSize: options.queue?.ttlSystem?.batchSize, + consumersDisabled: options.queue?.ttlSystem?.consumersDisabled, workerQueueSuffix: "ttl-worker:{queue:ttl-expiration:}queue", workerItemsSuffix: "ttl-worker:{queue:ttl-expiration:}items", visibilityTimeoutMs: options.queue?.ttlSystem?.visibilityTimeoutMs ?? 30_000, @@ -368,7 +369,7 @@ export class RunEngine { // Start TTL worker whenever TTL system is enabled, so expired runs enqueued by the // Lua script get processed even when the main engine worker is disabled (e.g. in tests). - if (options.queue?.ttlSystem && !options.queue.ttlSystem.disabled) { + if (options.queue?.ttlSystem && !options.queue.ttlSystem.disabled && !options.queue.ttlSystem.consumersDisabled) { this.ttlWorker.start(); } diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index e7108742a5..d0b12320f4 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -71,8 +71,11 @@ export type RunEngineOptions = { pollIntervalMs?: number; /** Max number of runs to expire per poll per shard (default: 100) */ batchSize?: number; - /** Whether TTL consumers are disabled (default: false) */ + /** Whether the entire TTL system is disabled (default: false) */ disabled?: boolean; + /** Whether TTL consumers + worker are disabled on this instance (default: false). + * When true, ZADD on enqueue still happens but polling loops and the TTL worker don't run. */ + consumersDisabled?: boolean; /** Visibility timeout for TTL worker jobs (ms, default: 120000) */ visibilityTimeoutMs?: number; /** Concurrency limit for the TTL redis-worker (default: 1) */ diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index e2ca18ed2c..7ebfaf660d 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -100,6 +100,8 @@ export type RunQueueOptions = { pollIntervalMs?: number; /** Max number of runs to expire per poll per shard (default: 100) */ batchSize?: number; + /** Whether TTL consumers (polling loops) are disabled on this instance (default: false) */ + consumersDisabled?: boolean; /** Key suffix for TTL worker's queue sorted set (relative to RunQueue keyPrefix) */ workerQueueSuffix: string; /** Key suffix for TTL worker's items hash (relative to RunQueue keyPrefix) */ @@ -1243,6 +1245,11 @@ export class RunQueue { return; } + if (this.options.ttlSystem.consumersDisabled) { + this.logger.debug("TTL consumers disabled on this instance"); + return; + } + const shardCount = this.options.ttlSystem.shardCount ?? this.shardCount; for (let i = 0; i < shardCount; i++) {