From b728b235d612be604f40a805d2e33653d0244bc8 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 13 May 2026 16:54:15 +0100 Subject: [PATCH 1/2] fix(webapp): auto-recover replication services after stream errors When the underlying logical-replication client errored (e.g. after a Postgres failover), the runs and sessions replication services logged the error and left the stream stopped. The host process kept running, the WAL backed up, and ClickHouse silently fell behind. Both services now run a configurable recovery strategy on stream errors, defaulting to in-process reconnect with exponential backoff so a fresh self-hosted setup heals on its own: - "reconnect" (default) re-subscribes via the existing subscribe(lastLsn) path with exponential backoff (1s -> 60s cap, unlimited attempts), which re-validates the publication, re-acquires the leader lock, and resumes from the last acknowledged LSN. - "exit" calls process.exit after a short flush window so a host's supervisor (Docker restart=always, systemd, k8s, etc.) can replace the process. - "log" preserves the historical behaviour. Per-service strategy + exit knobs are env-driven via RUN_REPLICATION_ERROR_STRATEGY / SESSION_REPLICATION_ERROR_STRATEGY plus matching *_EXIT_DELAY_MS / *_EXIT_CODE. Reconnect tuning is shared across both services via REPLICATION_RECONNECT_INITIAL_DELAY_MS / _MAX_DELAY_MS / _MAX_ATTEMPTS (0 = unlimited). --- .server-changes/replication-error-recovery.md | 6 + apps/webapp/app/env.server.ts | 23 ++ .../replicationErrorRecovery.server.ts | 190 +++++++++++ .../runsReplicationInstance.server.ts | 9 + .../services/runsReplicationService.server.ts | 21 ++ .../sessionsReplicationInstance.server.ts | 9 + .../sessionsReplicationService.server.ts | 21 ++ ...nsReplicationService.errorRecovery.test.ts | 303 ++++++++++++++++++ 8 files changed, 582 insertions(+) create mode 100644 .server-changes/replication-error-recovery.md create mode 100644 apps/webapp/app/services/replicationErrorRecovery.server.ts create mode 100644 apps/webapp/test/runsReplicationService.errorRecovery.test.ts diff --git a/.server-changes/replication-error-recovery.md b/.server-changes/replication-error-recovery.md new file mode 100644 index 00000000000..f5c8dfc6223 --- /dev/null +++ b/.server-changes/replication-error-recovery.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Runs and sessions replication services now auto-recover from stream errors (e.g. after a Postgres failover) instead of silently leaving replication stopped. Behaviour is configurable per service — reconnect (default), exit so a process supervisor can restart the host, or log. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 97cccbc1710..31589faaf96 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1306,6 +1306,16 @@ const EnvironmentSchema = z RUN_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"), RUN_REPLICATION_DISABLE_PAYLOAD_INSERT: z.string().default("0"), RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING: z.string().default("0"), + // What to do when the runs replication client errors (e.g. after a + // Postgres failover). `reconnect` (default) re-subscribes in-process with + // exponential backoff; `exit` exits the process so a supervisor restarts + // it; `log` preserves the old no-op behaviour. Reconnect tuning is + // shared across both replication services via REPLICATION_RECONNECT_*. + RUN_REPLICATION_ERROR_STRATEGY: z + .enum(["reconnect", "exit", "log"]) + .default("reconnect"), + RUN_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().default(5_000), + RUN_REPLICATION_EXIT_CODE: z.coerce.number().int().default(1), // Session replication (Postgres → ClickHouse sessions_v1). Shares Redis // with the runs replicator for leader locking but has its own slot and @@ -1338,6 +1348,19 @@ const EnvironmentSchema = z SESSION_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3), SESSION_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100), SESSION_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000), + // Error recovery — same semantics as RUN_REPLICATION_ERROR_STRATEGY. + SESSION_REPLICATION_ERROR_STRATEGY: z + .enum(["reconnect", "exit", "log"]) + .default("reconnect"), + SESSION_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().default(5_000), + SESSION_REPLICATION_EXIT_CODE: z.coerce.number().int().default(1), + + // Reconnect tuning shared across both replication services. Only + // applies when error strategy is `reconnect`. Max attempts of 0 means + // unlimited (default). + REPLICATION_RECONNECT_INITIAL_DELAY_MS: z.coerce.number().int().default(1_000), + REPLICATION_RECONNECT_MAX_DELAY_MS: z.coerce.number().int().default(60_000), + REPLICATION_RECONNECT_MAX_ATTEMPTS: z.coerce.number().int().default(0), // Clickhouse CLICKHOUSE_URL: z.string(), diff --git a/apps/webapp/app/services/replicationErrorRecovery.server.ts b/apps/webapp/app/services/replicationErrorRecovery.server.ts new file mode 100644 index 00000000000..ea3f2c0bf47 --- /dev/null +++ b/apps/webapp/app/services/replicationErrorRecovery.server.ts @@ -0,0 +1,190 @@ +import { Logger } from "@trigger.dev/core/logger"; + +// When the LogicalReplicationClient's WAL stream errors (e.g. after a +// Postgres failover) it calls stop() on itself and stays stopped. The host +// service has to decide how to recover. Three strategies are available: +// +// - "reconnect" — re-subscribe in-process with exponential backoff. Default; +// works without a process supervisor. +// - "exit" — exit the process so an external supervisor (Docker +// restart=always, ECS, systemd, k8s, ...) replaces it. Recommended when a +// supervisor is present because it gets a clean slate every time. +// - "log" — preserve the historical no-op behaviour. Useful for +// debugging or in test environments where you want to observe the +// silent-death failure mode. +export type ReplicationErrorRecoveryStrategy = + | { + type: "reconnect"; + initialDelayMs?: number; + maxDelayMs?: number; + // 0 (or undefined) means retry forever. + maxAttempts?: number; + } + | { + type: "exit"; + exitDelayMs?: number; + exitCode?: number; + } + | { type: "log" }; + +export interface ReplicationErrorRecoveryDeps { + strategy: ReplicationErrorRecoveryStrategy; + logger: Logger; + // Re-subscribe the underlying replication client. Implementations should + // call client.subscribe(lastAcknowledgedLsn) and resolve once that returns. + reconnect: () => Promise; + // True once the host service has begun graceful shutdown — recovery + // suppresses all work in that state. + isShuttingDown: () => boolean; +} + +export interface ReplicationErrorRecovery { + // Called from the replication client's "error" event handler. + handle(error: unknown): void; + // Called from the replication client's "start" event handler. Resets the + // reconnect attempt counter so the next failure starts from initialDelayMs. + notifyStreamStarted(): void; + // Cancel any pending reconnect/exit timer. Called from shutdown(). + dispose(): void; +} + +export function createReplicationErrorRecovery( + deps: ReplicationErrorRecoveryDeps +): ReplicationErrorRecovery { + const { strategy, logger, reconnect, isShuttingDown } = deps; + let attempt = 0; + let pendingReconnect: NodeJS.Timeout | null = null; + let pendingExit: NodeJS.Timeout | null = null; + let exiting = false; + + function scheduleReconnect(error: unknown): void { + if (strategy.type !== "reconnect") return; + if (pendingReconnect) return; + + attempt += 1; + const maxAttempts = strategy.maxAttempts ?? 0; + if (maxAttempts > 0 && attempt > maxAttempts) { + logger.error("Replication reconnect exceeded maxAttempts; giving up", { + attempt, + maxAttempts, + error, + }); + return; + } + + const initialDelay = strategy.initialDelayMs ?? 1_000; + const maxDelay = strategy.maxDelayMs ?? 60_000; + const delay = Math.min(initialDelay * Math.pow(2, attempt - 1), maxDelay); + + logger.error("Replication stream lost — scheduling reconnect", { + attempt, + delayMs: delay, + error, + }); + + pendingReconnect = setTimeout(async () => { + pendingReconnect = null; + if (isShuttingDown()) return; + + try { + await reconnect(); + // Success path is handled by notifyStreamStarted, which fires from + // the replication client's "start" event after the stream is live. + } catch (err) { + // subscribe() emits an "error" event of its own on failure, so the + // next attempt is scheduled by handle(). Log here anyway so reconnect + // failures stay visible even if the error event is suppressed. + logger.error("Replication reconnect attempt failed", { + attempt, + error: err, + }); + } + }, delay); + } + + function scheduleExit(): void { + if (strategy.type !== "exit") return; + if (exiting) return; + exiting = true; + + const delay = strategy.exitDelayMs ?? 5_000; + const code = strategy.exitCode ?? 1; + + logger.error("Fatal replication error — exiting to let process supervisor restart", { + exitCode: code, + exitDelayMs: delay, + }); + + pendingExit = setTimeout(() => { + // eslint-disable-next-line no-process-exit + process.exit(code); + }, delay); + // Don't hold a clean shutdown back on this timer. + pendingExit.unref(); + } + + return { + handle(error) { + if (isShuttingDown()) return; + switch (strategy.type) { + case "log": + return; + case "exit": + return scheduleExit(); + case "reconnect": + return scheduleReconnect(error); + } + }, + notifyStreamStarted() { + if (attempt > 0) { + logger.info("Replication reconnect succeeded", { attempt }); + attempt = 0; + } + }, + dispose() { + if (pendingReconnect) { + clearTimeout(pendingReconnect); + pendingReconnect = null; + } + if (pendingExit) { + clearTimeout(pendingExit); + pendingExit = null; + } + }, + }; +} + +// Shape of the env-driven configuration object the instance bootstrap files +// build from process.env. Kept separate from the strategy union above so the +// instance code can pass a single object regardless of which strategy is set. +export type ReplicationErrorRecoveryEnv = { + strategy: "reconnect" | "exit" | "log"; + reconnectInitialDelayMs?: number; + reconnectMaxDelayMs?: number; + reconnectMaxAttempts?: number; + exitDelayMs?: number; + exitCode?: number; +}; + +export function strategyFromEnv( + env: ReplicationErrorRecoveryEnv +): ReplicationErrorRecoveryStrategy { + switch (env.strategy) { + case "exit": + return { + type: "exit", + exitDelayMs: env.exitDelayMs, + exitCode: env.exitCode, + }; + case "log": + return { type: "log" }; + case "reconnect": + default: + return { + type: "reconnect", + initialDelayMs: env.reconnectInitialDelayMs, + maxDelayMs: env.reconnectMaxDelayMs, + maxAttempts: env.reconnectMaxAttempts, + }; + } +} diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index 0a8ab5e1bde..a614793ccc9 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -3,6 +3,7 @@ import invariant from "tiny-invariant"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; import { meter, provider } from "~/v3/tracer.server"; +import { strategyFromEnv } from "./replicationErrorRecovery.server"; import { RunsReplicationService } from "./runsReplicationService.server"; import { signalsEmitter } from "./signals.server"; @@ -69,6 +70,14 @@ function initializeRunsReplicationInstance() { insertStrategy: env.RUN_REPLICATION_INSERT_STRATEGY, disablePayloadInsert: env.RUN_REPLICATION_DISABLE_PAYLOAD_INSERT === "1", disableErrorFingerprinting: env.RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING === "1", + errorRecovery: strategyFromEnv({ + strategy: env.RUN_REPLICATION_ERROR_STRATEGY, + reconnectInitialDelayMs: env.REPLICATION_RECONNECT_INITIAL_DELAY_MS, + reconnectMaxDelayMs: env.REPLICATION_RECONNECT_MAX_DELAY_MS, + reconnectMaxAttempts: env.REPLICATION_RECONNECT_MAX_ATTEMPTS, + exitDelayMs: env.RUN_REPLICATION_EXIT_DELAY_MS, + exitCode: env.RUN_REPLICATION_EXIT_CODE, + }), }); if (env.RUN_REPLICATION_ENABLED === "1") { diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 7930c05481f..382e7928655 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -29,6 +29,11 @@ import EventEmitter from "node:events"; import pLimit from "p-limit"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { calculateErrorFingerprint } from "~/utils/errorFingerprinting"; +import { + createReplicationErrorRecovery, + type ReplicationErrorRecovery, + type ReplicationErrorRecoveryStrategy, +} from "./replicationErrorRecovery.server"; interface TransactionEvent { tag: "insert" | "update" | "delete"; @@ -73,6 +78,9 @@ export type RunsReplicationServiceOptions = { insertMaxDelayMs?: number; disablePayloadInsert?: boolean; disableErrorFingerprinting?: boolean; + // What to do when the replication client errors (e.g. after a Postgres + // failover). Defaults to in-process reconnect with exponential backoff. + errorRecovery?: ReplicationErrorRecoveryStrategy; }; type PostgresTaskRun = TaskRun & { masterQueue: string }; @@ -119,6 +127,7 @@ export class RunsReplicationService { private _insertStrategy: "insert" | "insert_async"; private _disablePayloadInsert: boolean; private _disableErrorFingerprinting: boolean; + private _errorRecovery: ReplicationErrorRecovery; // Metrics private _replicationLagHistogram: Histogram; @@ -250,14 +259,25 @@ export class RunsReplicationService { } }); + this._errorRecovery = createReplicationErrorRecovery({ + strategy: options.errorRecovery ?? { type: "reconnect" }, + logger: this.logger, + reconnect: async () => { + await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined); + }, + isShuttingDown: () => this._isShuttingDown || this._isShutDownComplete, + }); + this._replicationClient.events.on("error", (error) => { this.logger.error("Replication client error", { error, }); + this._errorRecovery.handle(error); }); this._replicationClient.events.on("start", () => { this.logger.info("Replication client started"); + this._errorRecovery.notifyStreamStarted(); }); this._replicationClient.events.on("acknowledge", ({ lsn }) => { @@ -278,6 +298,7 @@ export class RunsReplicationService { if (this._isShuttingDown) return; this._isShuttingDown = true; + this._errorRecovery.dispose(); this.logger.info("Initiating shutdown of runs replication service"); diff --git a/apps/webapp/app/services/sessionsReplicationInstance.server.ts b/apps/webapp/app/services/sessionsReplicationInstance.server.ts index c6ed1b6b088..5954d50df57 100644 --- a/apps/webapp/app/services/sessionsReplicationInstance.server.ts +++ b/apps/webapp/app/services/sessionsReplicationInstance.server.ts @@ -3,6 +3,7 @@ import invariant from "tiny-invariant"; import { env } from "~/env.server"; import { singleton } from "~/utils/singleton"; import { meter, provider } from "~/v3/tracer.server"; +import { strategyFromEnv } from "./replicationErrorRecovery.server"; import { SessionsReplicationService } from "./sessionsReplicationService.server"; export const sessionsReplicationInstance = singleton( @@ -66,6 +67,14 @@ function initializeSessionsReplicationInstance() { insertBaseDelayMs: env.SESSION_REPLICATION_INSERT_BASE_DELAY_MS, insertMaxDelayMs: env.SESSION_REPLICATION_INSERT_MAX_DELAY_MS, insertStrategy: env.SESSION_REPLICATION_INSERT_STRATEGY, + errorRecovery: strategyFromEnv({ + strategy: env.SESSION_REPLICATION_ERROR_STRATEGY, + reconnectInitialDelayMs: env.REPLICATION_RECONNECT_INITIAL_DELAY_MS, + reconnectMaxDelayMs: env.REPLICATION_RECONNECT_MAX_DELAY_MS, + reconnectMaxAttempts: env.REPLICATION_RECONNECT_MAX_ATTEMPTS, + exitDelayMs: env.SESSION_REPLICATION_EXIT_DELAY_MS, + exitCode: env.SESSION_REPLICATION_EXIT_CODE, + }), }); return service; diff --git a/apps/webapp/app/services/sessionsReplicationService.server.ts b/apps/webapp/app/services/sessionsReplicationService.server.ts index f7f384faffc..2465bd11de8 100644 --- a/apps/webapp/app/services/sessionsReplicationService.server.ts +++ b/apps/webapp/app/services/sessionsReplicationService.server.ts @@ -23,6 +23,11 @@ import { tryCatch } from "@trigger.dev/core/utils"; import { type Session } from "@trigger.dev/database"; import EventEmitter from "node:events"; import { ConcurrentFlushScheduler } from "./runsReplicationService.server"; +import { + createReplicationErrorRecovery, + type ReplicationErrorRecovery, + type ReplicationErrorRecoveryStrategy, +} from "./replicationErrorRecovery.server"; interface TransactionEvent { tag: "insert" | "update" | "delete"; @@ -65,6 +70,9 @@ export type SessionsReplicationServiceOptions = { insertMaxRetries?: number; insertBaseDelayMs?: number; insertMaxDelayMs?: number; + // What to do when the replication client errors (e.g. after a Postgres + // failover). Defaults to in-process reconnect with exponential backoff. + errorRecovery?: ReplicationErrorRecoveryStrategy; }; type SessionInsert = { @@ -105,6 +113,7 @@ export class SessionsReplicationService { private _insertBaseDelayMs: number; private _insertMaxDelayMs: number; private _insertStrategy: "insert" | "insert_async"; + private _errorRecovery: ReplicationErrorRecovery; // Metrics private _replicationLagHistogram: Histogram; @@ -231,14 +240,25 @@ export class SessionsReplicationService { } }); + this._errorRecovery = createReplicationErrorRecovery({ + strategy: options.errorRecovery ?? { type: "reconnect" }, + logger: this.logger, + reconnect: async () => { + await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined); + }, + isShuttingDown: () => this._isShuttingDown || this._isShutDownComplete, + }); + this._replicationClient.events.on("error", (error) => { this.logger.error("Replication client error", { error, }); + this._errorRecovery.handle(error); }); this._replicationClient.events.on("start", () => { this.logger.info("Replication client started"); + this._errorRecovery.notifyStreamStarted(); }); this._replicationClient.events.on("acknowledge", ({ lsn }) => { @@ -259,6 +279,7 @@ export class SessionsReplicationService { if (this._isShuttingDown) return; this._isShuttingDown = true; + this._errorRecovery.dispose(); this.logger.info("Initiating shutdown of sessions replication service"); diff --git a/apps/webapp/test/runsReplicationService.errorRecovery.test.ts b/apps/webapp/test/runsReplicationService.errorRecovery.test.ts new file mode 100644 index 00000000000..be3965ebebe --- /dev/null +++ b/apps/webapp/test/runsReplicationService.errorRecovery.test.ts @@ -0,0 +1,303 @@ +import { ClickHouse } from "@internal/clickhouse"; +import { containerTest } from "@internal/testcontainers"; +import { setTimeout } from "node:timers/promises"; +import { z } from "zod"; +import { RunsReplicationService } from "~/services/runsReplicationService.server"; + +vi.setConfig({ testTimeout: 120_000 }); + +// These tests force a replication-stream disconnect (the same shape Postgres +// reports during an RDS failover) and verify each error-recovery strategy +// behaves correctly: +// - "reconnect" (default) auto-resubscribes and resumes from the last LSN +// - "exit" exits the process so a supervisor restarts it +// - "log" keeps historical behaviour (silent death of the stream) +describe("RunsReplicationService error recovery", () => { + containerTest( + "reconnect strategy auto-recovers after the replication backend is killed", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + compression: { request: true }, + logLevel: "warn", + }); + + const service = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logLevel: "warn", + // Tight backoff so the test doesn't wait minutes. + errorRecovery: { + type: "reconnect", + initialDelayMs: 200, + maxDelayMs: 1000, + }, + }); + + try { + await service.start(); + const seed = await seedOrgProjectEnv(prisma); + + // Insert a row pre-failure and verify it replicates. + const runA = await createTaskRun(prisma, seed, "run_pre_failover"); + await waitForRunIdsInClickHouse(clickhouse, [runA.id]); + + // Kill the WAL sender backend — same shape as the RDS failover that + // dropped both replication clients on test cloud. + await killReplicationBackend(prisma, "runs-replication"); + + // Insert a row after the kill. With the reconnect strategy the + // service should automatically resubscribe and pick this up. + const runB = await createTaskRun(prisma, seed, "run_post_failover"); + await waitForRunIdsInClickHouse(clickhouse, [runA.id, runB.id], { timeoutMs: 30_000 }); + } finally { + await service.shutdown(); + } + } + ); + + containerTest( + "exit strategy calls process.exit after the replication backend is killed", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + // Stub process.exit so the test process itself doesn't terminate. + // mockImplementation returns never; cast to satisfy the signature. + const exitSpy = vi + .spyOn(process, "exit") + .mockImplementation(((code?: number) => undefined as never) as typeof process.exit); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + compression: { request: true }, + logLevel: "warn", + }); + + const service = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logLevel: "warn", + errorRecovery: { + type: "exit", + // Short delay so the test stays quick; the flush window doesn't + // matter here because we're stubbing the actual exit call. + exitDelayMs: 100, + exitCode: 1, + }, + }); + + try { + await service.start(); + const seed = await seedOrgProjectEnv(prisma); + + // Sanity check: replication is alive before the kill. + const runA = await createTaskRun(prisma, seed, "run_pre_exit"); + await waitForRunIdsInClickHouse(clickhouse, [runA.id]); + + await killReplicationBackend(prisma, "runs-replication"); + + // Wait long enough for the error event to fire and the exit timer + // to elapse, plus slack. + await setTimeout(2000); + + expect(exitSpy).toHaveBeenCalledWith(1); + } finally { + exitSpy.mockRestore(); + await service.shutdown(); + } + } + ); + + containerTest( + "log strategy leaves replication permanently stopped", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication", + compression: { request: true }, + logLevel: "warn", + }); + + const service = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 1, + flushIntervalMs: 100, + flushBatchSize: 1, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logLevel: "warn", + errorRecovery: { type: "log" }, + }); + + try { + await service.start(); + const seed = await seedOrgProjectEnv(prisma); + + const runA = await createTaskRun(prisma, seed, "run_pre_log"); + await waitForRunIdsInClickHouse(clickhouse, [runA.id]); + + await killReplicationBackend(prisma, "runs-replication"); + + // Give the service time to attempt (and not) any recovery. + await setTimeout(2000); + + // Insert a row after the kill — under the log strategy nothing + // brings the stream back, so this should not appear in ClickHouse. + const runB = await createTaskRun(prisma, seed, "run_post_log"); + await setTimeout(3000); + + const ids = await readReplicatedRunIds(clickhouse); + expect(ids).toContain(runA.id); + expect(ids).not.toContain(runB.id); + } finally { + await service.shutdown(); + } + } + ); +}); + +// -------------------------------------------------------------------------- +// helpers +// -------------------------------------------------------------------------- + +type SeedRefs = { + organizationId: string; + projectId: string; + runtimeEnvironmentId: string; +}; + +async function seedOrgProjectEnv(prisma: any): Promise { + const organization = await prisma.organization.create({ + data: { title: "test", slug: "test" }, + }); + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + return { + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: runtimeEnvironment.id, + }; +} + +async function createTaskRun(prisma: any, seed: SeedRefs, friendlyId: string) { + return prisma.taskRun.create({ + data: { + friendlyId, + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: friendlyId, + spanId: friendlyId, + queue: "test", + runtimeEnvironmentId: seed.runtimeEnvironmentId, + projectId: seed.projectId, + organizationId: seed.organizationId, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); +} + +// Kills any active WAL-sender backends whose application_name matches the +// service. This mirrors the failover-style disconnect that surfaced the bug: +// the WAL stream connection drops and the LogicalReplicationClient errors. +async function killReplicationBackend(prisma: any, applicationName: string) { + // Wait briefly for the WAL sender to appear in pg_stat_replication after + // subscribe() completes — there's a small async gap between + // replicationStart firing and the row being visible to other sessions. + for (let attempt = 0; attempt < 20; attempt++) { + const rows = await prisma.$queryRawUnsafe<{ pid: number }[]>( + `SELECT pid FROM pg_stat_replication WHERE application_name = $1`, + applicationName + ); + if (rows.length > 0) { + for (const { pid } of rows) { + await prisma.$executeRawUnsafe(`SELECT pg_terminate_backend(${pid})`); + } + return; + } + await setTimeout(100); + } + throw new Error( + `No active replication backend found for application_name=${applicationName} after 2s` + ); +} + +async function readReplicatedRunIds(clickhouse: ClickHouse): Promise { + const queryRuns = clickhouse.reader.query({ + name: "runs-replication", + query: "SELECT run_id FROM trigger_dev.task_runs_v2", + schema: z.object({ run_id: z.string() }), + }); + const [queryError, result] = await queryRuns({}); + if (queryError) throw queryError; + return (result ?? []).map((row) => row.run_id); +} + +async function waitForRunIdsInClickHouse( + clickhouse: ClickHouse, + expectedIds: string[], + options: { timeoutMs?: number; pollIntervalMs?: number } = {} +) { + const timeoutMs = options.timeoutMs ?? 10_000; + const pollIntervalMs = options.pollIntervalMs ?? 250; + const deadline = Date.now() + timeoutMs; + let lastIds: string[] = []; + while (Date.now() < deadline) { + lastIds = await readReplicatedRunIds(clickhouse); + if (expectedIds.every((id) => lastIds.includes(id))) return; + await setTimeout(pollIntervalMs); + } + throw new Error( + `Timed out waiting for run ids ${JSON.stringify(expectedIds)} to land in ClickHouse. ` + + `Last seen: ${JSON.stringify(lastIds)}` + ); +} From 5ba46ff0c64779a085dd42a2213d0dad6e439cb1 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 13 May 2026 22:31:30 +0100 Subject: [PATCH 2/2] fix(webapp): reschedule reconnect when subscribe() throws MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses PR review feedback: - LogicalReplicationClient.subscribe() can throw before its internal "error" listener is wired up (notably when pg client.connect() fails mid-failover). The reconnect strategy's catch block only logged, so recovery silently stopped. Now also calls scheduleReconnect(err) — the pendingReconnect guard makes it idempotent if an error event was also emitted. - Reject negative values for the new replication-recovery env vars and cap exit codes at 255. - Convert the new ReplicationErrorRecovery{Deps,} interfaces to type aliases to match the repo's TypeScript style. - Tighten the reconnect dep comment to drop a stale "lastAcknowledgedLsn" reference (the wrapper-tracked resume LSN is what callers actually pass). - Restore process.exit after service.shutdown() in the exit-strategy test so a delayed exit timer can't terminate the test worker. --- apps/webapp/app/env.server.ts | 14 ++++++------- .../replicationErrorRecovery.server.ts | 20 +++++++++++-------- ...nsReplicationService.errorRecovery.test.ts | 4 +++- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 31589faaf96..2f5ef27cfd5 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1314,8 +1314,8 @@ const EnvironmentSchema = z RUN_REPLICATION_ERROR_STRATEGY: z .enum(["reconnect", "exit", "log"]) .default("reconnect"), - RUN_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().default(5_000), - RUN_REPLICATION_EXIT_CODE: z.coerce.number().int().default(1), + RUN_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().min(0).default(5_000), + RUN_REPLICATION_EXIT_CODE: z.coerce.number().int().min(0).max(255).default(1), // Session replication (Postgres → ClickHouse sessions_v1). Shares Redis // with the runs replicator for leader locking but has its own slot and @@ -1352,15 +1352,15 @@ const EnvironmentSchema = z SESSION_REPLICATION_ERROR_STRATEGY: z .enum(["reconnect", "exit", "log"]) .default("reconnect"), - SESSION_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().default(5_000), - SESSION_REPLICATION_EXIT_CODE: z.coerce.number().int().default(1), + SESSION_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().min(0).default(5_000), + SESSION_REPLICATION_EXIT_CODE: z.coerce.number().int().min(0).max(255).default(1), // Reconnect tuning shared across both replication services. Only // applies when error strategy is `reconnect`. Max attempts of 0 means // unlimited (default). - REPLICATION_RECONNECT_INITIAL_DELAY_MS: z.coerce.number().int().default(1_000), - REPLICATION_RECONNECT_MAX_DELAY_MS: z.coerce.number().int().default(60_000), - REPLICATION_RECONNECT_MAX_ATTEMPTS: z.coerce.number().int().default(0), + REPLICATION_RECONNECT_INITIAL_DELAY_MS: z.coerce.number().int().min(0).default(1_000), + REPLICATION_RECONNECT_MAX_DELAY_MS: z.coerce.number().int().min(0).default(60_000), + REPLICATION_RECONNECT_MAX_ATTEMPTS: z.coerce.number().int().min(0).default(0), // Clickhouse CLICKHOUSE_URL: z.string(), diff --git a/apps/webapp/app/services/replicationErrorRecovery.server.ts b/apps/webapp/app/services/replicationErrorRecovery.server.ts index ea3f2c0bf47..55f7495959d 100644 --- a/apps/webapp/app/services/replicationErrorRecovery.server.ts +++ b/apps/webapp/app/services/replicationErrorRecovery.server.ts @@ -27,18 +27,18 @@ export type ReplicationErrorRecoveryStrategy = } | { type: "log" }; -export interface ReplicationErrorRecoveryDeps { +export type ReplicationErrorRecoveryDeps = { strategy: ReplicationErrorRecoveryStrategy; logger: Logger; // Re-subscribe the underlying replication client. Implementations should - // call client.subscribe(lastAcknowledgedLsn) and resolve once that returns. + // call client.subscribe(...) and resolve once the stream is started. reconnect: () => Promise; // True once the host service has begun graceful shutdown — recovery // suppresses all work in that state. isShuttingDown: () => boolean; -} +}; -export interface ReplicationErrorRecovery { +export type ReplicationErrorRecovery = { // Called from the replication client's "error" event handler. handle(error: unknown): void; // Called from the replication client's "start" event handler. Resets the @@ -46,7 +46,7 @@ export interface ReplicationErrorRecovery { notifyStreamStarted(): void; // Cancel any pending reconnect/exit timer. Called from shutdown(). dispose(): void; -} +}; export function createReplicationErrorRecovery( deps: ReplicationErrorRecoveryDeps @@ -91,13 +91,17 @@ export function createReplicationErrorRecovery( // Success path is handled by notifyStreamStarted, which fires from // the replication client's "start" event after the stream is live. } catch (err) { - // subscribe() emits an "error" event of its own on failure, so the - // next attempt is scheduled by handle(). Log here anyway so reconnect - // failures stay visible even if the error event is suppressed. + // subscribe() can throw without first emitting an "error" event — + // notably when the initial pg client.connect() fails because Postgres + // is still unreachable mid-failover. Schedule the next attempt + // ourselves so recovery doesn't silently stop. If subscribe() did + // also emit an "error" event, handle() will call scheduleReconnect() + // first; the guard on pendingReconnect makes this idempotent. logger.error("Replication reconnect attempt failed", { attempt, error: err, }); + scheduleReconnect(err); } }, delay); } diff --git a/apps/webapp/test/runsReplicationService.errorRecovery.test.ts b/apps/webapp/test/runsReplicationService.errorRecovery.test.ts index be3965ebebe..fc25c3b9eef 100644 --- a/apps/webapp/test/runsReplicationService.errorRecovery.test.ts +++ b/apps/webapp/test/runsReplicationService.errorRecovery.test.ts @@ -126,8 +126,10 @@ describe("RunsReplicationService error recovery", () => { expect(exitSpy).toHaveBeenCalledWith(1); } finally { - exitSpy.mockRestore(); + // shutdown() before mockRestore() so any in-flight exit timer can + // be disposed without terminating the Vitest worker. await service.shutdown(); + exitSpy.mockRestore(); } } );