Skip to content

Commit 6f8cc24

Browse files
committed
fix(webapp): reschedule reconnect when subscribe() throws
Addresses PR review feedback: - LogicalReplicationClient.subscribe() can throw before its internal "error" listener is wired up (notably when pg client.connect() fails mid-failover). The reconnect strategy's catch block only logged, so recovery silently stopped. Now also calls scheduleReconnect(err) — the pendingReconnect guard makes it idempotent if an error event was also emitted. - Reject negative values for the new replication-recovery env vars and cap exit codes at 255. - Convert the new ReplicationErrorRecovery{Deps,} interfaces to type aliases to match the repo's TypeScript style. - Tighten the reconnect dep comment to drop a stale "lastAcknowledgedLsn" reference (the wrapper-tracked resume LSN is what callers actually pass). - Restore process.exit after service.shutdown() in the exit-strategy test so a delayed exit timer can't terminate the test worker.
1 parent 6bfd109 commit 6f8cc24

3 files changed

Lines changed: 22 additions & 16 deletions

File tree

apps/webapp/app/env.server.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,8 +1314,8 @@ const EnvironmentSchema = z
13141314
RUN_REPLICATION_ERROR_STRATEGY: z
13151315
.enum(["reconnect", "exit", "log"])
13161316
.default("reconnect"),
1317-
RUN_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().default(5_000),
1318-
RUN_REPLICATION_EXIT_CODE: z.coerce.number().int().default(1),
1317+
RUN_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().min(0).default(5_000),
1318+
RUN_REPLICATION_EXIT_CODE: z.coerce.number().int().min(0).max(255).default(1),
13191319

13201320
// Session replication (Postgres → ClickHouse sessions_v1). Shares Redis
13211321
// with the runs replicator for leader locking but has its own slot and
@@ -1352,15 +1352,15 @@ const EnvironmentSchema = z
13521352
SESSION_REPLICATION_ERROR_STRATEGY: z
13531353
.enum(["reconnect", "exit", "log"])
13541354
.default("reconnect"),
1355-
SESSION_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().default(5_000),
1356-
SESSION_REPLICATION_EXIT_CODE: z.coerce.number().int().default(1),
1355+
SESSION_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().min(0).default(5_000),
1356+
SESSION_REPLICATION_EXIT_CODE: z.coerce.number().int().min(0).max(255).default(1),
13571357

13581358
// Reconnect tuning shared across both replication services. Only
13591359
// applies when error strategy is `reconnect`. Max attempts of 0 means
13601360
// unlimited (default).
1361-
REPLICATION_RECONNECT_INITIAL_DELAY_MS: z.coerce.number().int().default(1_000),
1362-
REPLICATION_RECONNECT_MAX_DELAY_MS: z.coerce.number().int().default(60_000),
1363-
REPLICATION_RECONNECT_MAX_ATTEMPTS: z.coerce.number().int().default(0),
1361+
REPLICATION_RECONNECT_INITIAL_DELAY_MS: z.coerce.number().int().min(0).default(1_000),
1362+
REPLICATION_RECONNECT_MAX_DELAY_MS: z.coerce.number().int().min(0).default(60_000),
1363+
REPLICATION_RECONNECT_MAX_ATTEMPTS: z.coerce.number().int().min(0).default(0),
13641364

13651365
// Clickhouse
13661366
CLICKHOUSE_URL: z.string(),

apps/webapp/app/services/replicationErrorRecovery.server.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,26 +27,26 @@ export type ReplicationErrorRecoveryStrategy =
2727
}
2828
| { type: "log" };
2929

30-
export interface ReplicationErrorRecoveryDeps {
30+
export type ReplicationErrorRecoveryDeps = {
3131
strategy: ReplicationErrorRecoveryStrategy;
3232
logger: Logger;
3333
// Re-subscribe the underlying replication client. Implementations should
34-
// call client.subscribe(lastAcknowledgedLsn) and resolve once that returns.
34+
// call client.subscribe(...) and resolve once the stream is started.
3535
reconnect: () => Promise<void>;
3636
// True once the host service has begun graceful shutdown — recovery
3737
// suppresses all work in that state.
3838
isShuttingDown: () => boolean;
39-
}
39+
};
4040

41-
export interface ReplicationErrorRecovery {
41+
export type ReplicationErrorRecovery = {
4242
// Called from the replication client's "error" event handler.
4343
handle(error: unknown): void;
4444
// Called from the replication client's "start" event handler. Resets the
4545
// reconnect attempt counter so the next failure starts from initialDelayMs.
4646
notifyStreamStarted(): void;
4747
// Cancel any pending reconnect/exit timer. Called from shutdown().
4848
dispose(): void;
49-
}
49+
};
5050

5151
export function createReplicationErrorRecovery(
5252
deps: ReplicationErrorRecoveryDeps
@@ -91,13 +91,17 @@ export function createReplicationErrorRecovery(
9191
// Success path is handled by notifyStreamStarted, which fires from
9292
// the replication client's "start" event after the stream is live.
9393
} catch (err) {
94-
// subscribe() emits an "error" event of its own on failure, so the
95-
// next attempt is scheduled by handle(). Log here anyway so reconnect
96-
// failures stay visible even if the error event is suppressed.
94+
// subscribe() can throw without first emitting an "error" event —
95+
// notably when the initial pg client.connect() fails because Postgres
96+
// is still unreachable mid-failover. Schedule the next attempt
97+
// ourselves so recovery doesn't silently stop. If subscribe() did
98+
// also emit an "error" event, handle() will call scheduleReconnect()
99+
// first; the guard on pendingReconnect makes this idempotent.
97100
logger.error("Replication reconnect attempt failed", {
98101
attempt,
99102
error: err,
100103
});
104+
scheduleReconnect(err);
101105
}
102106
}, delay);
103107
}

apps/webapp/test/runsReplicationService.errorRecovery.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,10 @@ describe("RunsReplicationService error recovery", () => {
126126

127127
expect(exitSpy).toHaveBeenCalledWith(1);
128128
} finally {
129-
exitSpy.mockRestore();
129+
// shutdown() before mockRestore() so any in-flight exit timer can
130+
// be disposed without terminating the Vitest worker.
130131
await service.shutdown();
132+
exitSpy.mockRestore();
131133
}
132134
}
133135
);

0 commit comments

Comments
 (0)