Skip to content

Commit 9cb611d

Browse files
committed
refactor(webapp): cloud-driven basin sync, drop reconcile worker
The cloud billing app now drives basin lifecycle operations via a single webapp admin endpoint instead of the webapp enqueuing a reconcile worker that called back into billing to resolve plan + retention. Plan vocabulary now lives in cloud (`Limits.streamBasinRetention`); S2 access stays in the webapp. New: POST /admin/api/v1/orgs/:orgId/stream-basin - { action: "ensure", retention } — provision or PATCH retention - { action: "deprovision" } — null the column Helpers: ensureBasinForOrg / deprovisionBasinForOrg in the provisioner. Removed: - streamBasinRetentionByPlan.server.ts (plan vocabulary) - v3.reconcileStreamBasinForOrg worker job - enqueueStreamBasinReconcile from the three setPlan branches - admin.api.v1.stream-basins.{backfill,reconfigure}.ts (replaced by the per-org sync endpoint) - REALTIME_STREAMS_BASIN_RETENTION_FREE/HOBBY/PRO env vars The .server-changes entry stays accurate — feature behaviour is unchanged from a customer's perspective.
1 parent 684fc2e commit 9cb611d

9 files changed

Lines changed: 112 additions & 394 deletions

.server-changes/per-org-stream-basins.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ area: webapp
33
type: feature
44
---
55

6-
Per-org S2 stream basins with plan-tied retention (free 7d / hobby 30d / pro 365d), gated by `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED`. Stops basin retention from deleting streams out from under live chat sessions and unlocks per-org cost attribution via S2 basin metrics.
6+
Per-org S2 stream basins with retention tied to the org's billing plan, gated by `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED`. Stops basin retention from deleting streams out from under live chat sessions and unlocks per-org cost attribution.

apps/webapp/app/env.server.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1523,11 +1523,6 @@ const EnvironmentSchema = z
15231523
REALTIME_STREAMS_BASIN_NAME_PREFIX: z.string().default("triggerdotdev"),
15241524
REALTIME_STREAMS_BASIN_NAME_ENV: z.string().default("dev"),
15251525
REALTIME_STREAMS_BASIN_DEFAULT_RETENTION: durationString().default("30d"),
1526-
// Plan-specific retention overrides consulted by the
1527-
// streamBasinRetentionByPlan shim only.
1528-
REALTIME_STREAMS_BASIN_RETENTION_FREE: durationString().default("7d"),
1529-
REALTIME_STREAMS_BASIN_RETENTION_HOBBY: durationString().default("30d"),
1530-
REALTIME_STREAMS_BASIN_RETENTION_PRO: durationString().default("365d"),
15311526
REALTIME_STREAMS_BASIN_STORAGE_CLASS: z.enum(["express", "standard"]).default("express"),
15321527
REALTIME_STREAMS_BASIN_DELETE_ON_EMPTY_MIN_AGE: durationString().default("1h"),
15331528
REALTIME_STREAMS_DEFAULT_VERSION: z.enum(["v1", "v2"]).default("v1"),
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
4+
import { isValidDuration } from "~/services/realtime/duration.server";
5+
import {
6+
deprovisionBasinForOrg,
7+
ensureBasinForOrg,
8+
} from "~/services/realtime/streamBasinProvisioner.server";
9+
10+
const ParamsSchema = z.object({ organizationId: z.string() });
11+
12+
const BodySchema = z.discriminatedUnion("action", [
13+
z.object({
14+
action: z.literal("ensure"),
15+
retention: z
16+
.string()
17+
.refine(isValidDuration, "retention must be a duration like 7d, 30d, 365d, 1h, 1y"),
18+
}),
19+
z.object({ action: z.literal("deprovision") }),
20+
]);
21+
22+
export async function action({ request, params }: ActionFunctionArgs) {
23+
await requireAdminApiRequest(request);
24+
25+
const { organizationId } = ParamsSchema.parse(params);
26+
27+
let parsed: z.infer<typeof BodySchema>;
28+
try {
29+
const text = await request.text();
30+
const raw = text.length > 0 ? JSON.parse(text) : {};
31+
const result = BodySchema.safeParse(raw);
32+
if (!result.success) {
33+
return json({ ok: false, error: result.error.flatten() }, { status: 400 });
34+
}
35+
parsed = result.data;
36+
} catch {
37+
return json({ ok: false, error: "Invalid JSON body" }, { status: 400 });
38+
}
39+
40+
if (parsed.action === "ensure") {
41+
const result = await ensureBasinForOrg(organizationId, parsed.retention);
42+
return json({ ok: true, ...result });
43+
}
44+
45+
const result = await deprovisionBasinForOrg(organizationId);
46+
return json({ ok: true, ...result });
47+
}

apps/webapp/app/routes/admin.api.v1.stream-basins.backfill.ts

Lines changed: 0 additions & 146 deletions
This file was deleted.

apps/webapp/app/routes/admin.api.v1.stream-basins.reconfigure.ts

Lines changed: 0 additions & 70 deletions
This file was deleted.

apps/webapp/app/services/platform.v3.server.ts

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,6 @@ export async function setPlan(
403403
// Invalidate billing cache since plan changed
404404
opts?.invalidateBillingCache?.(organization.id);
405405
platformCache.entitlement.remove(organization.id).catch(() => {});
406-
await enqueueStreamBasinReconcile(organization.id);
407406
return redirect(newProjectPath(organization, "You're on the Free plan."));
408407
} else {
409408
return redirectWithErrorMessage(
@@ -421,38 +420,17 @@ export async function setPlan(
421420
// Invalidate billing cache since subscription changed
422421
opts?.invalidateBillingCache?.(organization.id);
423422
platformCache.entitlement.remove(organization.id).catch(() => {});
424-
await enqueueStreamBasinReconcile(organization.id);
425423
return redirectWithSuccessMessage(callerPath, request, "Subscription updated successfully.");
426424
}
427425
case "canceled_subscription": {
428426
// Invalidate billing cache since subscription was canceled
429427
opts?.invalidateBillingCache?.(organization.id);
430428
platformCache.entitlement.remove(organization.id).catch(() => {});
431-
await enqueueStreamBasinReconcile(organization.id);
432429
return redirectWithSuccessMessage(callerPath, request, "Subscription canceled.");
433430
}
434431
}
435432
}
436433

437-
// Best-effort: failures are logged but never block the plan change.
438-
// The reconciler is idempotent and re-reads the plan when it runs, so
439-
// concurrent plan changes collapse to one pending job per org.
440-
async function enqueueStreamBasinReconcile(orgId: string) {
441-
try {
442-
const { commonWorker } = await import("~/v3/commonWorker.server");
443-
await commonWorker.enqueue({
444-
job: "v3.reconcileStreamBasinForOrg",
445-
payload: { orgId },
446-
id: `reconcileStreamBasin:${orgId}`,
447-
});
448-
} catch (error) {
449-
logger.warn("[setPlan] failed to enqueue stream basin reconcile", {
450-
orgId,
451-
error: error instanceof Error ? error.message : String(error),
452-
});
453-
}
454-
}
455-
456434
export async function setConcurrencyAddOn(organizationId: string, amount: number) {
457435
if (!client) return undefined;
458436

0 commit comments

Comments
 (0)