diff --git a/.changeset/fix-local-build-load.md b/.changeset/fix-local-build-load.md new file mode 100644 index 00000000000..13f91da9d6a --- /dev/null +++ b/.changeset/fix-local-build-load.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +Fix `--load` flag being silently ignored on local/self-hosted builds. diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index faf34bcd025..da7ea5b91f1 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -3,142 +3,169 @@ import { env as stdEnv } from "std-env"; import { z } from "zod"; import { AdditionalEnvVars, BoolEnv } from "./envUtil.js"; -const Env = z.object({ - // This will come from `spec.nodeName` in k8s - TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()), - TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), - - // Required settings - TRIGGER_API_URL: z.string().url(), - TRIGGER_WORKER_TOKEN: z.string(), // accepts file:// path to read from a file - MANAGED_WORKER_SECRET: z.string(), - OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), // set on the runners - - // Workload API settings (coordinator mode) - the workload API is what the run controller connects to - TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default(true), - TRIGGER_WORKLOAD_API_PROTOCOL: z - .string() - .transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase())) - .default("http"), - TRIGGER_WORKLOAD_API_DOMAIN: z.string().optional(), // If unset, will use orchestrator-specific default - TRIGGER_WORKLOAD_API_HOST_INTERNAL: z.string().default("0.0.0.0"), - TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on - TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller - - // Runner settings - RUNNER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().optional(), - RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(), - RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv) - RUNNER_PRETTY_LOGS: BoolEnv.default(false), - - // Dequeue settings (provider mode) - TRIGGER_DEQUEUE_ENABLED: BoolEnv.default(true), - TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250), - TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000), - TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(1), - TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT: z.coerce.number().int().default(1), - TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(10), - TRIGGER_DEQUEUE_SCALING_STRATEGY: z.enum(["none", "smooth", "aggressive"]).default("none"), - TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS: z.coerce.number().int().default(5000), // 5 seconds - TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS: z.coerce.number().int().default(30000), // 30 seconds - TRIGGER_DEQUEUE_SCALING_TARGET_RATIO: z.coerce.number().default(1.0), // Target ratio of queue items to consumers (1.0 = 1 item per consumer) - TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA: z.coerce.number().min(0).max(1).default(0.3), // Smooths queue length measurements (0=historical, 1=current) - TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms) - TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR: z.coerce.number().min(0).max(1).default(0.7), // Smooths consumer count changes after EWMA (0=no scaling, 1=immediate) - - // Optional services - TRIGGER_WARM_START_URL: z.string().optional(), - TRIGGER_CHECKPOINT_URL: z.string().optional(), - TRIGGER_METADATA_URL: z.string().optional(), - - // Used by the resource monitor - RESOURCE_MONITOR_ENABLED: BoolEnv.default(false), - RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), - RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(), - - // Docker settings - DOCKER_API_VERSION: z.string().optional(), - DOCKER_PLATFORM: z.string().optional(), // e.g. linux/amd64, linux/arm64 - DOCKER_STRIP_IMAGE_DIGEST: BoolEnv.default(true), - DOCKER_REGISTRY_USERNAME: z.string().optional(), - DOCKER_REGISTRY_PASSWORD: z.string().optional(), - DOCKER_REGISTRY_URL: z.string().optional(), // e.g. https://index.docker.io/v1 - DOCKER_ENFORCE_MACHINE_PRESETS: BoolEnv.default(true), - DOCKER_AUTOREMOVE_EXITED_CONTAINERS: BoolEnv.default(true), - /** - * Network mode to use for all runners. Supported standard values are: `bridge`, `host`, `none`, and `container:`. - * Any other value is taken as a custom network's name to which all runners should connect to. - * - * Accepts a list of comma-separated values to attach to multiple networks. Additional networks are interpreted as network names and will be attached after container creation. - * - * **WARNING**: Specifying multiple networks will slightly increase startup times. - * - * @default "host" - */ - DOCKER_RUNNER_NETWORKS: z.string().default("host"), - - // Kubernetes settings - KUBERNETES_FORCE_ENABLED: BoolEnv.default(false), - KUBERNETES_NAMESPACE: z.string().default("default"), - KUBERNETES_WORKER_NODETYPE_LABEL: z.string().default("v4-worker"), - KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv - KUBERNETES_EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"), - KUBERNETES_EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"), - KUBERNETES_STRIP_IMAGE_DIGEST: BoolEnv.default(false), - KUBERNETES_CPU_REQUEST_MIN_CORES: z.coerce.number().min(0).default(0), - KUBERNETES_CPU_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(0.75), // Ratio of CPU limit, so 0.75 = 75% of CPU limit - KUBERNETES_MEMORY_REQUEST_MIN_GB: z.coerce.number().min(0).default(0), - KUBERNETES_MEMORY_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(1), // Ratio of memory limit, so 1 = 100% of memory limit - - // Per-preset overrides of the global KUBERNETES_CPU_REQUEST_RATIO - KUBERNETES_CPU_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), - - // Per-preset overrides of the global KUBERNETES_MEMORY_REQUEST_RATIO - KUBERNETES_MEMORY_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), - - KUBERNETES_MEMORY_OVERHEAD_GB: z.coerce.number().min(0).optional(), // Optional memory overhead to add to the limit in GB - KUBERNETES_SCHEDULER_NAME: z.string().optional(), // Custom scheduler name for pods - KUBERNETES_LARGE_MACHINE_POOL_LABEL: z.string().optional(), // if set, large-* presets affinity for machinepool= - - // Project affinity settings - pods from the same project prefer the same node - KUBERNETES_PROJECT_AFFINITY_ENABLED: BoolEnv.default(false), - KUBERNETES_PROJECT_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(50), - KUBERNETES_PROJECT_AFFINITY_TOPOLOGY_KEY: z.string().trim().min(1).default("kubernetes.io/hostname"), - - // Placement tags settings - PLACEMENT_TAGS_ENABLED: BoolEnv.default(false), - PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"), - - // Metrics - METRICS_ENABLED: BoolEnv.default(true), - METRICS_COLLECT_DEFAULTS: BoolEnv.default(true), - METRICS_HOST: z.string().default("127.0.0.1"), - METRICS_PORT: z.coerce.number().int().default(9090), - - // Pod cleaner - POD_CLEANER_ENABLED: BoolEnv.default(true), - POD_CLEANER_INTERVAL_MS: z.coerce.number().int().default(10000), - POD_CLEANER_BATCH_SIZE: z.coerce.number().int().default(500), - - // Failed pod handler - FAILED_POD_HANDLER_ENABLED: BoolEnv.default(true), - FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS: z.coerce.number().int().default(1000), - - // Debug - DEBUG: BoolEnv.default(false), - SEND_RUN_DEBUG_LOGS: BoolEnv.default(false), -}); +const Env = z + .object({ + // This will come from `spec.nodeName` in k8s + TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()), + TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), + + // Required settings + TRIGGER_API_URL: z.string().url(), + TRIGGER_WORKER_TOKEN: z.string(), // accepts file:// path to read from a file + MANAGED_WORKER_SECRET: z.string(), + OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), // set on the runners + + // Workload API settings (coordinator mode) - the workload API is what the run controller connects to + TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default(true), + TRIGGER_WORKLOAD_API_PROTOCOL: z + .string() + .transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase())) + .default("http"), + TRIGGER_WORKLOAD_API_DOMAIN: z.string().optional(), // If unset, will use orchestrator-specific default + TRIGGER_WORKLOAD_API_HOST_INTERNAL: z.string().default("0.0.0.0"), + TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on + TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller + + // Runner settings + RUNNER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().optional(), + RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(), + RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv) + RUNNER_PRETTY_LOGS: BoolEnv.default(false), + + // Dequeue settings (provider mode) + TRIGGER_DEQUEUE_ENABLED: BoolEnv.default(true), + TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250), + TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000), + TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(1), + TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT: z.coerce.number().int().default(1), + TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(10), + TRIGGER_DEQUEUE_SCALING_STRATEGY: z.enum(["none", "smooth", "aggressive"]).default("none"), + TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS: z.coerce.number().int().default(5000), // 5 seconds + TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS: z.coerce.number().int().default(30000), // 30 seconds + TRIGGER_DEQUEUE_SCALING_TARGET_RATIO: z.coerce.number().default(1.0), // Target ratio of queue items to consumers (1.0 = 1 item per consumer) + TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA: z.coerce.number().min(0).max(1).default(0.3), // Smooths queue length measurements (0=historical, 1=current) + TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms) + TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR: z.coerce.number().min(0).max(1).default(0.7), // Smooths consumer count changes after EWMA (0=no scaling, 1=immediate) + + // Optional services + TRIGGER_WARM_START_URL: z.string().optional(), + TRIGGER_CHECKPOINT_URL: z.string().optional(), + TRIGGER_METADATA_URL: z.string().optional(), + + // Used by the resource monitor + RESOURCE_MONITOR_ENABLED: BoolEnv.default(false), + RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), + RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(), + + // Docker settings + DOCKER_API_VERSION: z.string().optional(), + DOCKER_PLATFORM: z.string().optional(), // e.g. linux/amd64, linux/arm64 + DOCKER_STRIP_IMAGE_DIGEST: BoolEnv.default(true), + DOCKER_REGISTRY_USERNAME: z.string().optional(), + DOCKER_REGISTRY_PASSWORD: z.string().optional(), + DOCKER_REGISTRY_URL: z.string().optional(), // e.g. https://index.docker.io/v1 + DOCKER_ENFORCE_MACHINE_PRESETS: BoolEnv.default(true), + DOCKER_AUTOREMOVE_EXITED_CONTAINERS: BoolEnv.default(true), + /** + * Network mode to use for all runners. Supported standard values are: `bridge`, `host`, `none`, and `container:`. + * Any other value is taken as a custom network's name to which all runners should connect to. + * + * Accepts a list of comma-separated values to attach to multiple networks. Additional networks are interpreted as network names and will be attached after container creation. + * + * **WARNING**: Specifying multiple networks will slightly increase startup times. + * + * @default "host" + */ + DOCKER_RUNNER_NETWORKS: z.string().default("host"), + + // Compute settings + COMPUTE_GATEWAY_URL: z.string().url().optional(), + COMPUTE_GATEWAY_AUTH_TOKEN: z.string().optional(), + COMPUTE_GATEWAY_TIMEOUT_MS: z.coerce.number().int().default(30_000), + COMPUTE_SNAPSHOTS_ENABLED: BoolEnv.default(false), + + // Kubernetes settings + KUBERNETES_FORCE_ENABLED: BoolEnv.default(false), + KUBERNETES_NAMESPACE: z.string().default("default"), + KUBERNETES_WORKER_NODETYPE_LABEL: z.string().default("v4-worker"), + KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv + KUBERNETES_EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"), + KUBERNETES_EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"), + KUBERNETES_STRIP_IMAGE_DIGEST: BoolEnv.default(false), + KUBERNETES_CPU_REQUEST_MIN_CORES: z.coerce.number().min(0).default(0), + KUBERNETES_CPU_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(0.75), // Ratio of CPU limit, so 0.75 = 75% of CPU limit + KUBERNETES_MEMORY_REQUEST_MIN_GB: z.coerce.number().min(0).default(0), + KUBERNETES_MEMORY_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(1), // Ratio of memory limit, so 1 = 100% of memory limit + + // Per-preset overrides of the global KUBERNETES_CPU_REQUEST_RATIO + KUBERNETES_CPU_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), + + // Per-preset overrides of the global KUBERNETES_MEMORY_REQUEST_RATIO + KUBERNETES_MEMORY_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), + + KUBERNETES_MEMORY_OVERHEAD_GB: z.coerce.number().min(0).optional(), // Optional memory overhead to add to the limit in GB + KUBERNETES_SCHEDULER_NAME: z.string().optional(), // Custom scheduler name for pods + KUBERNETES_LARGE_MACHINE_POOL_LABEL: z.string().optional(), // if set, large-* presets affinity for machinepool= + + // Project affinity settings - pods from the same project prefer the same node + KUBERNETES_PROJECT_AFFINITY_ENABLED: BoolEnv.default(false), + KUBERNETES_PROJECT_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(50), + KUBERNETES_PROJECT_AFFINITY_TOPOLOGY_KEY: z + .string() + .trim() + .min(1) + .default("kubernetes.io/hostname"), + + // Placement tags settings + PLACEMENT_TAGS_ENABLED: BoolEnv.default(false), + PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"), + + // Metrics + METRICS_ENABLED: BoolEnv.default(true), + METRICS_COLLECT_DEFAULTS: BoolEnv.default(true), + METRICS_HOST: z.string().default("127.0.0.1"), + METRICS_PORT: z.coerce.number().int().default(9090), + + // Pod cleaner + POD_CLEANER_ENABLED: BoolEnv.default(true), + POD_CLEANER_INTERVAL_MS: z.coerce.number().int().default(10000), + POD_CLEANER_BATCH_SIZE: z.coerce.number().int().default(500), + + // Failed pod handler + FAILED_POD_HANDLER_ENABLED: BoolEnv.default(true), + FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS: z.coerce.number().int().default(1000), + + // Debug + DEBUG: BoolEnv.default(false), + SEND_RUN_DEBUG_LOGS: BoolEnv.default(false), + }) + .superRefine((data, ctx) => { + if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_METADATA_URL) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "TRIGGER_METADATA_URL is required when COMPUTE_SNAPSHOTS_ENABLED is true", + path: ["TRIGGER_METADATA_URL"], + }); + } + if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_WORKLOAD_API_DOMAIN) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "TRIGGER_WORKLOAD_API_DOMAIN is required when COMPUTE_SNAPSHOTS_ENABLED is true", + path: ["TRIGGER_WORKLOAD_API_DOMAIN"], + }); + } + }); export const env = Env.parse(stdEnv); diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 0e274b30390..dd91591aba6 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -14,6 +14,7 @@ import { } from "./resourceMonitor.js"; import { KubernetesWorkloadManager } from "./workloadManager/kubernetes.js"; import { DockerWorkloadManager } from "./workloadManager/docker.js"; +import { ComputeWorkloadManager } from "./workloadManager/compute.js"; import { HttpServer, CheckpointClient, @@ -35,9 +36,11 @@ class ManagedSupervisor { private readonly metricsServer?: HttpServer; private readonly workloadServer: WorkloadServer; private readonly workloadManager: WorkloadManager; + private readonly computeManager?: ComputeWorkloadManager; private readonly logger = new SimpleStructuredLogger("managed-supervisor"); private readonly resourceMonitor: ResourceMonitor; private readonly checkpointClient?: CheckpointClient; + private readonly isComputeMode: boolean; private readonly podCleaner?: PodCleaner; private readonly failedPodHandler?: FailedPodHandler; @@ -77,9 +80,22 @@ class ManagedSupervisor { : new DockerResourceMonitor(new Docker()) : new NoopResourceMonitor(); - this.workloadManager = this.isKubernetes - ? new KubernetesWorkloadManager(workloadManagerOptions) - : new DockerWorkloadManager(workloadManagerOptions); + this.isComputeMode = !!env.COMPUTE_GATEWAY_URL; + + if (env.COMPUTE_GATEWAY_URL) { + const computeManager = new ComputeWorkloadManager({ + ...workloadManagerOptions, + gatewayUrl: env.COMPUTE_GATEWAY_URL, + gatewayAuthToken: env.COMPUTE_GATEWAY_AUTH_TOKEN, + gatewayTimeoutMs: env.COMPUTE_GATEWAY_TIMEOUT_MS, + }); + this.computeManager = computeManager; + this.workloadManager = computeManager; + } else { + this.workloadManager = this.isKubernetes + ? new KubernetesWorkloadManager(workloadManagerOptions) + : new DockerWorkloadManager(workloadManagerOptions); + } if (this.isKubernetes) { if (env.POD_CLEANER_ENABLED) { @@ -187,7 +203,7 @@ class ManagedSupervisor { this.workloadServer.notifyRun({ run }); }); - this.workerSession.on("runQueueMessage", async ({ time, message }) => { + this.workerSession.on("runQueueMessage", async ({ time, message, dequeueResponseMs, pollingIntervalMs }) => { this.logger.log(`Received message with timestamp ${time.toLocaleString()}`, message); if (message.completedWaitpoints.length > 0) { @@ -207,6 +223,33 @@ class ManagedSupervisor { if (checkpoint) { this.logger.log("Restoring run", { runId: message.run.id }); + if (this.isComputeMode && this.computeManager && env.COMPUTE_SNAPSHOTS_ENABLED) { + try { + // Derive runnerId unique per restore cycle (matches iceman's pattern) + const runIdShort = message.run.friendlyId.replace("run_", ""); + const checkpointSuffix = checkpoint.id.slice(-8); + const runnerId = `runner-${runIdShort}-${checkpointSuffix}`; + + const didRestore = await this.computeManager.restore({ + snapshotId: checkpoint.location, + runnerId, + runFriendlyId: message.run.friendlyId, + snapshotFriendlyId: message.snapshot.friendlyId, + machine: message.run.machine, + }); + + if (didRestore) { + this.logger.log("Compute restore successful", { runId: message.run.id, runnerId }); + } else { + this.logger.error("Compute restore failed", { runId: message.run.id, runnerId }); + } + } catch (error) { + this.logger.error("Failed to restore run (compute)", { error }); + } + + return; + } + if (!this.checkpointClient) { this.logger.error("No checkpoint client", { runId: message.run.id }); return; @@ -236,7 +279,9 @@ class ManagedSupervisor { this.logger.log("Scheduling run", { runId: message.run.id }); + const warmStartStart = performance.now(); const didWarmStart = await this.tryWarmStart(message); + const warmStartCheckMs = Math.round(performance.now() - warmStartStart); if (didWarmStart) { this.logger.log("Warm start successful", { runId: message.run.id }); @@ -252,6 +297,9 @@ class ManagedSupervisor { await this.workloadManager.create({ dequeuedAt: message.dequeuedAt, + dequeueResponseMs, + pollingIntervalMs, + warmStartCheckMs, envId: message.environment.id, envType: message.environment.type, image: message.image, @@ -296,6 +344,7 @@ class ManagedSupervisor { host: env.TRIGGER_WORKLOAD_API_HOST_INTERNAL, workerClient: this.workerSession.httpClient, checkpointClient: this.checkpointClient, + computeManager: this.computeManager, }); this.workloadServer.on("runConnected", this.onRunConnected.bind(this)); diff --git a/apps/supervisor/src/workloadManager/compute.ts b/apps/supervisor/src/workloadManager/compute.ts new file mode 100644 index 00000000000..ea7be55d1a9 --- /dev/null +++ b/apps/supervisor/src/workloadManager/compute.ts @@ -0,0 +1,311 @@ +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; +import { + type WorkloadManager, + type WorkloadManagerCreateOptions, + type WorkloadManagerOptions, +} from "./types.js"; +import { env } from "../env.js"; +import { getRunnerId } from "../util.js"; +import { tryCatch } from "@trigger.dev/core"; + +type ComputeWorkloadManagerOptions = WorkloadManagerOptions & { + gatewayUrl: string; + gatewayAuthToken?: string; + gatewayTimeoutMs: number; +}; + +export class ComputeWorkloadManager implements WorkloadManager { + private readonly logger = new SimpleStructuredLogger("compute-workload-manager"); + + constructor(private opts: ComputeWorkloadManagerOptions) { + if (opts.workloadApiDomain) { + this.logger.warn("⚠️ Custom workload API domain", { + domain: opts.workloadApiDomain, + }); + } + } + + async create(opts: WorkloadManagerCreateOptions) { + const runnerId = getRunnerId(opts.runFriendlyId, opts.nextAttemptNumber); + + const envVars: Record = { + OTEL_EXPORTER_OTLP_ENDPOINT: env.OTEL_EXPORTER_OTLP_ENDPOINT, + TRIGGER_DEQUEUED_AT_MS: String(opts.dequeuedAt.getTime()), + TRIGGER_POD_SCHEDULED_AT_MS: String(Date.now()), + TRIGGER_ENV_ID: opts.envId, + TRIGGER_DEPLOYMENT_ID: opts.deploymentFriendlyId, + TRIGGER_DEPLOYMENT_VERSION: opts.deploymentVersion, + TRIGGER_RUN_ID: opts.runFriendlyId, + TRIGGER_SNAPSHOT_ID: opts.snapshotFriendlyId, + TRIGGER_SUPERVISOR_API_PROTOCOL: this.opts.workloadApiProtocol, + TRIGGER_SUPERVISOR_API_PORT: String(this.opts.workloadApiPort), + TRIGGER_SUPERVISOR_API_DOMAIN: this.opts.workloadApiDomain ?? "", + TRIGGER_WORKER_INSTANCE_NAME: env.TRIGGER_WORKER_INSTANCE_NAME, + TRIGGER_RUNNER_ID: runnerId, + TRIGGER_MACHINE_CPU: String(opts.machine.cpu), + TRIGGER_MACHINE_MEMORY: String(opts.machine.memory), + PRETTY_LOGS: String(env.RUNNER_PRETTY_LOGS), + }; + + if (this.opts.warmStartUrl) { + envVars.TRIGGER_WARM_START_URL = this.opts.warmStartUrl; + } + + if (env.COMPUTE_SNAPSHOTS_ENABLED && this.opts.metadataUrl) { + envVars.TRIGGER_METADATA_URL = this.opts.metadataUrl; + } + + if (this.opts.heartbeatIntervalSeconds) { + envVars.TRIGGER_HEARTBEAT_INTERVAL_SECONDS = String(this.opts.heartbeatIntervalSeconds); + } + + if (this.opts.snapshotPollIntervalSeconds) { + envVars.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS = String( + this.opts.snapshotPollIntervalSeconds + ); + } + + if (this.opts.additionalEnvVars) { + Object.assign(envVars, this.opts.additionalEnvVars); + } + + const headers: Record = { + "Content-Type": "application/json", + }; + + if (this.opts.gatewayAuthToken) { + headers["Authorization"] = `Bearer ${this.opts.gatewayAuthToken}`; + } + + // Strip image digest — resolve by tag, not digest + const imageRef = opts.image.split("@")[0]!; + + const url = `${this.opts.gatewayUrl}/api/instances`; + + // Wide event: single canonical log line emitted in finally + const event: Record = { + // High-cardinality identifiers + runId: opts.runFriendlyId, + runnerId, + envId: opts.envId, + envType: opts.envType, + orgId: opts.orgId, + projectId: opts.projectId, + deploymentVersion: opts.deploymentVersion, + machine: opts.machine.name, + // Environment + instanceName: env.TRIGGER_WORKER_INSTANCE_NAME, + // Supervisor timing + dequeueResponseMs: opts.dequeueResponseMs, + pollingIntervalMs: opts.pollingIntervalMs, + warmStartCheckMs: opts.warmStartCheckMs, + // Request + image: imageRef, + url, + }; + + const startMs = performance.now(); + + try { + const [fetchError, response] = await tryCatch( + fetch(url, { + method: "POST", + headers, + signal: AbortSignal.timeout(this.opts.gatewayTimeoutMs), + body: JSON.stringify({ + name: runnerId, + image: imageRef, + env: envVars, + cpu: opts.machine.cpu, + memory_gb: opts.machine.memory, + metadata: { + runId: opts.runFriendlyId, + envId: opts.envId, + envType: opts.envType, + orgId: opts.orgId, + projectId: opts.projectId, + deploymentVersion: opts.deploymentVersion, + machine: opts.machine.name, + }, + }), + }) + ); + + if (fetchError) { + event.error = fetchError instanceof Error ? fetchError.message : String(fetchError); + event.errorType = + fetchError instanceof DOMException && fetchError.name === "TimeoutError" + ? "timeout" + : "fetch"; + return; + } + + event.status = response.status; + + if (!response.ok) { + const [bodyError, body] = await tryCatch(response.text()); + event.responseBody = bodyError ? undefined : body; + return; + } + + const [parseError, data] = await tryCatch(response.json()); + + if (parseError) { + event.error = parseError instanceof Error ? parseError.message : String(parseError); + event.errorType = "parse"; + return; + } + + event.instanceId = data.id; + event.ok = true; + } finally { + event.durationMs = Math.round(performance.now() - startMs); + event.ok ??= false; + this.logger.info("create instance", event); + } + } + + private get authHeaders(): Record { + const headers: Record = { + "Content-Type": "application/json", + }; + if (this.opts.gatewayAuthToken) { + headers["Authorization"] = `Bearer ${this.opts.gatewayAuthToken}`; + } + return headers; + } + + async snapshot(opts: { + runnerId: string; + callbackUrl: string; + metadata: Record; + }): Promise { + const url = `${this.opts.gatewayUrl}/api/instances/${opts.runnerId}/snapshot`; + + const [error, response] = await tryCatch( + fetch(url, { + method: "POST", + headers: this.authHeaders, + signal: AbortSignal.timeout(this.opts.gatewayTimeoutMs), + body: JSON.stringify({ + callback: { + url: opts.callbackUrl, + metadata: opts.metadata, + }, + }), + }) + ); + + if (error) { + this.logger.error("snapshot request failed", { + runnerId: opts.runnerId, + error: error instanceof Error ? error.message : String(error), + }); + return false; + } + + if (response.status !== 202) { + this.logger.error("snapshot request rejected", { + runnerId: opts.runnerId, + status: response.status, + }); + return false; + } + + this.logger.info("snapshot request accepted", { runnerId: opts.runnerId }); + return true; + } + + async deleteInstance(runnerId: string): Promise { + const url = `${this.opts.gatewayUrl}/api/instances/${runnerId}`; + + const [error, response] = await tryCatch( + fetch(url, { + method: "DELETE", + headers: this.authHeaders, + signal: AbortSignal.timeout(this.opts.gatewayTimeoutMs), + }) + ); + + if (error) { + this.logger.error("delete instance failed", { + runnerId, + error: error instanceof Error ? error.message : String(error), + }); + return false; + } + + if (!response.ok) { + this.logger.error("delete instance rejected", { + runnerId, + status: response.status, + }); + return false; + } + + this.logger.info("delete instance success", { runnerId }); + return true; + } + + async restore(opts: { + snapshotId: string; + runnerId: string; + runFriendlyId: string; + snapshotFriendlyId: string; + machine: { cpu: number; memory: number }; + }): Promise { + const url = `${this.opts.gatewayUrl}/api/snapshots/${opts.snapshotId}/restore`; + + const metadata: Record = { + TRIGGER_RUNNER_ID: opts.runnerId, + TRIGGER_RUN_ID: opts.runFriendlyId, + TRIGGER_SNAPSHOT_ID: opts.snapshotFriendlyId, + TRIGGER_SUPERVISOR_API_PROTOCOL: this.opts.workloadApiProtocol, + TRIGGER_SUPERVISOR_API_PORT: String(this.opts.workloadApiPort), + TRIGGER_SUPERVISOR_API_DOMAIN: this.opts.workloadApiDomain ?? "", + TRIGGER_WORKER_INSTANCE_NAME: env.TRIGGER_WORKER_INSTANCE_NAME, + }; + + const body = { + name: opts.runnerId, + metadata, + cpu: opts.machine.cpu, + memory_mb: opts.machine.memory * 1024, + }; + + this.logger.debug("restore request body", { url, body }); + + const [error, response] = await tryCatch( + fetch(url, { + method: "POST", + headers: this.authHeaders, + signal: AbortSignal.timeout(this.opts.gatewayTimeoutMs), + body: JSON.stringify(body), + }) + ); + + if (error) { + this.logger.error("restore request failed", { + snapshotId: opts.snapshotId, + runnerId: opts.runnerId, + error: error instanceof Error ? error.message : String(error), + }); + return false; + } + + if (!response.ok) { + this.logger.error("restore request rejected", { + snapshotId: opts.snapshotId, + runnerId: opts.runnerId, + status: response.status, + }); + return false; + } + + this.logger.info("restore request success", { + snapshotId: opts.snapshotId, + runnerId: opts.runnerId, + }); + return true; + } +} diff --git a/apps/supervisor/src/workloadManager/types.ts b/apps/supervisor/src/workloadManager/types.ts index 90b61957795..82c7ea7b4c0 100644 --- a/apps/supervisor/src/workloadManager/types.ts +++ b/apps/supervisor/src/workloadManager/types.ts @@ -24,6 +24,10 @@ export interface WorkloadManagerCreateOptions { nextAttemptNumber?: number; dequeuedAt: Date; placementTags?: PlacementTag[]; + // Timing context (populated by supervisor handler, included in wide event) + dequeueResponseMs?: number; + pollingIntervalMs?: number; + warmStartCheckMs?: number; // identifiers envId: string; envType: EnvironmentType; diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index 35d53d36099..07801682bc0 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -24,6 +24,7 @@ import { HttpServer, type CheckpointClient } from "@trigger.dev/core/v3/serverOn import { type IncomingMessage } from "node:http"; import { register } from "../metrics.js"; import { env } from "../env.js"; +import type { ComputeWorkloadManager } from "../workloadManager/compute.js"; // Use the official export when upgrading to socket.io@4.8.0 interface DefaultEventsMap { @@ -53,15 +54,25 @@ type WorkloadServerEvents = { ]; }; +const ComputeSnapshotCallbackBody = z.object({ + snapshot_id: z.string(), + instance_id: z.string(), + status: z.enum(["completed", "failed"]), + error: z.string().optional(), + metadata: z.record(z.string()).optional(), +}); + type WorkloadServerOptions = { port: number; host?: string; workerClient: SupervisorHttpClient; checkpointClient?: CheckpointClient; + computeManager?: ComputeWorkloadManager; }; export class WorkloadServer extends EventEmitter { private checkpointClient?: CheckpointClient; + private computeManager?: ComputeWorkloadManager; private readonly logger = new SimpleStructuredLogger("workload-server"); @@ -93,6 +104,7 @@ export class WorkloadServer extends EventEmitter { this.workerClient = opts.workerClient; this.checkpointClient = opts.checkpointClient; + this.computeManager = opts.computeManager; this.httpServer = this.createHttpServer({ host, port }); this.websocketServer = this.createWebsocketServer(); @@ -231,11 +243,19 @@ export class WorkloadServer extends EventEmitter { handler: async ({ reply, params, req }) => { this.logger.debug("Suspend request", { params, headers: req.headers }); - if (!this.checkpointClient) { + const runnerId = this.runnerIdFromRequest(req); + const deploymentVersion = this.deploymentVersionFromRequest(req); + const projectRef = this.projectRefFromRequest(req); + + if (!runnerId || !deploymentVersion || !projectRef) { + this.logger.error("Invalid headers for suspend request", { + ...params, + headers: req.headers, + }); reply.json( { ok: false, - error: "Checkpoints disabled", + error: "Invalid headers", } satisfies WorkloadSuspendRunResponseBody, false, 400 @@ -243,19 +263,41 @@ export class WorkloadServer extends EventEmitter { return; } - const runnerId = this.runnerIdFromRequest(req); - const deploymentVersion = this.deploymentVersionFromRequest(req); - const projectRef = this.projectRefFromRequest(req); + if (this.computeManager && env.COMPUTE_SNAPSHOTS_ENABLED) { + if (!env.TRIGGER_WORKLOAD_API_DOMAIN) { + this.logger.error( + "TRIGGER_WORKLOAD_API_DOMAIN is not set, cannot create snapshot callback URL" + ); + reply.json({ error: "Snapshot callbacks not configured" }, false, 500); + return; + } - if (!runnerId || !deploymentVersion || !projectRef) { - this.logger.error("Invalid headers for suspend request", { - ...params, - headers: req.headers, + // Compute mode: fire-and-forget snapshot with callback + reply.json({ ok: true } satisfies WorkloadSuspendRunResponseBody, false, 202); + + const callbackUrl = `${env.TRIGGER_WORKLOAD_API_PROTOCOL}://${env.TRIGGER_WORKLOAD_API_DOMAIN}:${env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL}/api/v1/compute/snapshot-complete`; + + const snapshotResult = await this.computeManager.snapshot({ + runnerId, + callbackUrl, + metadata: { + runId: params.runFriendlyId, + snapshotFriendlyId: params.snapshotFriendlyId, + }, }); + + if (!snapshotResult) { + this.logger.error("Failed to request compute snapshot", { params, runnerId }); + } + + return; + } + + if (!this.checkpointClient) { reply.json( { ok: false, - error: "Invalid headers", + error: "Checkpoints disabled", } satisfies WorkloadSuspendRunResponseBody, false, 400 @@ -394,6 +436,76 @@ export class WorkloadServer extends EventEmitter { }); } + // Compute snapshot callback endpoint + httpServer.route("/api/v1/compute/snapshot-complete", "POST", { + bodySchema: ComputeSnapshotCallbackBody, + handler: async ({ reply, body }) => { + this.logger.info("Compute snapshot callback", { + snapshotId: body.snapshot_id, + instanceId: body.instance_id, + status: body.status, + error: body.error, + metadata: body.metadata, + }); + + const runId = body.metadata?.runId; + const snapshotFriendlyId = body.metadata?.snapshotFriendlyId; + + if (!runId || !snapshotFriendlyId) { + this.logger.error("Compute snapshot callback missing metadata", { body }); + reply.empty(400); + return; + } + + if (body.status === "completed") { + const result = await this.workerClient.submitSuspendCompletion({ + runId, + snapshotId: snapshotFriendlyId, + body: { + success: true, + checkpoint: { + type: "KUBERNETES", + location: body.snapshot_id, + }, + }, + }); + + if (result.success) { + this.logger.info("Suspend completion submitted, deleting instance", { + runId, + instanceId: body.instance_id, + }); + await this.computeManager?.deleteInstance(body.instance_id); + } else { + this.logger.error("Failed to submit suspend completion", { + runId, + snapshotFriendlyId, + error: result.error, + }); + } + } else { + const result = await this.workerClient.submitSuspendCompletion({ + runId, + snapshotId: snapshotFriendlyId, + body: { + success: false, + error: body.error ?? "Snapshot failed", + }, + }); + + if (!result.success) { + this.logger.error("Failed to submit suspend failure", { + runId, + snapshotFriendlyId, + error: result.error, + }); + } + } + + reply.empty(200); + }, + }); + return httpServer; } diff --git a/packages/cli-v3/src/deploy/buildImage.ts b/packages/cli-v3/src/deploy/buildImage.ts index 2225d7db056..31a2b658545 100644 --- a/packages/cli-v3/src/deploy/buildImage.ts +++ b/packages/cli-v3/src/deploy/buildImage.ts @@ -205,6 +205,7 @@ async function remoteBuildImage(options: DepotBuildImageOptions): Promise { + onDequeue: async (messages, timing) => { // Always update queue length, default to 0 for empty dequeues or missing value this.updateQueueLength(messages[0]?.workerQueueLength ?? 0); // Forward to the original handler - await this.consumerOptions.onDequeue(messages); + await this.consumerOptions.onDequeue(messages, timing); }, }); diff --git a/packages/core/src/v3/runEngineWorker/supervisor/events.ts b/packages/core/src/v3/runEngineWorker/supervisor/events.ts index a51c504a3e6..df4a93686a9 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/events.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/events.ts @@ -6,6 +6,8 @@ export type WorkerEvents = { { time: Date; message: DequeuedMessage; + dequeueResponseMs?: number; + pollingIntervalMs?: number; }, ]; requestRunAttemptStart: [ diff --git a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts index 4379eb54f37..76faee40809 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts @@ -15,7 +15,7 @@ export type RunQueueConsumerOptions = { preDequeue?: PreDequeueFn; preSkip?: PreSkipFn; maxRunCount?: number; - onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise; + onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise; }; export class RunQueueConsumer implements QueueConsumer { @@ -23,13 +23,14 @@ export class RunQueueConsumer implements QueueConsumer { private readonly preDequeue?: PreDequeueFn; private readonly preSkip?: PreSkipFn; private readonly maxRunCount?: number; - private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise; + private readonly onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise; private readonly logger = new SimpleStructuredLogger("queue-consumer"); private intervalMs: number; private idleIntervalMs: number; private isEnabled: boolean; + private lastScheduledIntervalMs: number; constructor(opts: RunQueueConsumerOptions) { this.isEnabled = false; @@ -38,6 +39,7 @@ export class RunQueueConsumer implements QueueConsumer { this.preDequeue = opts.preDequeue; this.preSkip = opts.preSkip; this.maxRunCount = opts.maxRunCount; + this.lastScheduledIntervalMs = opts.idleIntervalMs; this.onDequeue = opts.onDequeue; this.client = opts.client; } @@ -111,16 +113,18 @@ export class RunQueueConsumer implements QueueConsumer { let nextIntervalMs = this.idleIntervalMs; try { + const dequeueStart = performance.now(); const response = await this.client.dequeue({ maxResources: preDequeueResult?.maxResources, maxRunCount: this.maxRunCount, }); + const dequeueResponseMs = Math.round(performance.now() - dequeueStart); if (!response.success) { this.logger.error("Failed to dequeue", { error: response.error }); } else { try { - await this.onDequeue(response.data); + await this.onDequeue(response.data, { dequeueResponseMs, pollingIntervalMs: this.lastScheduledIntervalMs }); if (response.data.length > 0) { nextIntervalMs = this.intervalMs; @@ -141,6 +145,7 @@ export class RunQueueConsumer implements QueueConsumer { this.logger.verbose("scheduled dequeue with idle interval", { delayMs }); } + this.lastScheduledIntervalMs = delayMs; setTimeout(this.dequeue.bind(this), delayMs); } } diff --git a/packages/core/src/v3/runEngineWorker/supervisor/session.ts b/packages/core/src/v3/runEngineWorker/supervisor/session.ts index e5a783b8d41..b2d344fb3dc 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/session.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/session.ts @@ -80,13 +80,15 @@ export class SupervisorSession extends EventEmitter { }); } - private async onDequeue(messages: WorkerApiDequeueResponseBody): Promise { + private async onDequeue(messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }): Promise { this.logger.verbose("Dequeued messages with contents", { count: messages.length, messages }); for (const message of messages) { this.emit("runQueueMessage", { time: new Date(), message, + dequeueResponseMs: timing?.dequeueResponseMs, + pollingIntervalMs: timing?.pollingIntervalMs, }); } }