Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ const EnvironmentSchema = z
RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE: z.coerce.number().int().default(100),
RUN_ENGINE_TTL_WORKER_CONCURRENCY: z.coerce.number().int().default(1),
RUN_ENGINE_TTL_WORKER_BATCH_MAX_SIZE: z.coerce.number().int().default(50),
RUN_ENGINE_TTL_CONSUMERS_DISABLED: BoolEnv.default(false),
RUN_ENGINE_TTL_WORKER_BATCH_MAX_WAIT_MS: z.coerce.number().int().default(5_000),

/** Optional maximum TTL for all runs (e.g. "14d"). If set, runs without an explicit TTL
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ function createRunEngine() {
},
ttlSystem: {
disabled: env.RUN_ENGINE_TTL_SYSTEM_DISABLED,
consumersDisabled: env.RUN_ENGINE_TTL_CONSUMERS_DISABLED,
shardCount: env.RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT,
pollIntervalMs: env.RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS,
batchSize: env.RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE,
Expand Down
3 changes: 2 additions & 1 deletion internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ export class RunEngine {
shardCount: options.queue?.ttlSystem?.shardCount,
pollIntervalMs: options.queue?.ttlSystem?.pollIntervalMs,
batchSize: options.queue?.ttlSystem?.batchSize,
consumersDisabled: options.queue?.ttlSystem?.consumersDisabled,
workerQueueSuffix: "ttl-worker:{queue:ttl-expiration:}queue",
workerItemsSuffix: "ttl-worker:{queue:ttl-expiration:}items",
visibilityTimeoutMs: options.queue?.ttlSystem?.visibilityTimeoutMs ?? 30_000,
Expand Down Expand Up @@ -368,7 +369,7 @@ export class RunEngine {

// Start TTL worker whenever TTL system is enabled, so expired runs enqueued by the
// Lua script get processed even when the main engine worker is disabled (e.g. in tests).
if (options.queue?.ttlSystem && !options.queue.ttlSystem.disabled) {
if (options.queue?.ttlSystem && !options.queue.ttlSystem.disabled && !options.queue.ttlSystem.consumersDisabled) {
this.ttlWorker.start();
}

Expand Down
5 changes: 4 additions & 1 deletion internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ export type RunEngineOptions = {
pollIntervalMs?: number;
/** Max number of runs to expire per poll per shard (default: 100) */
batchSize?: number;
/** Whether TTL consumers are disabled (default: false) */
/** Whether the entire TTL system is disabled (default: false) */
disabled?: boolean;
/** Whether TTL consumers + worker are disabled on this instance (default: false).
* When true, ZADD on enqueue still happens but polling loops and the TTL worker don't run. */
consumersDisabled?: boolean;
/** Visibility timeout for TTL worker jobs (ms, default: 120000) */
visibilityTimeoutMs?: number;
/** Concurrency limit for the TTL redis-worker (default: 1) */
Expand Down
7 changes: 7 additions & 0 deletions internal-packages/run-engine/src/run-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ export type RunQueueOptions = {
pollIntervalMs?: number;
/** Max number of runs to expire per poll per shard (default: 100) */
batchSize?: number;
/** Whether TTL consumers (polling loops) are disabled on this instance (default: false) */
consumersDisabled?: boolean;
/** Key suffix for TTL worker's queue sorted set (relative to RunQueue keyPrefix) */
workerQueueSuffix: string;
/** Key suffix for TTL worker's items hash (relative to RunQueue keyPrefix) */
Expand Down Expand Up @@ -1243,6 +1245,11 @@ export class RunQueue {
return;
}

if (this.options.ttlSystem.consumersDisabled) {
this.logger.debug("TTL consumers disabled on this instance");
return;
}

const shardCount = this.options.ttlSystem.shardCount ?? this.shardCount;

for (let i = 0; i < shardCount; i++) {
Expand Down
Loading