Skip to content

Commit d6d3586

Browse files
committed
refactor(webapp): drop plan vocabulary from streamBasinProvisioner
The provisioner is now purely retention-string-driven: callers pass a duration like "30d" and it does the S2 round-trip. No tier types, no plan-name matching, no billing imports. The plan-aware mapping moves into a new `streamBasinRetentionByPlan.server.ts` shim that's the only file in the webapp that knows about plan codes. Callers that resolve retention from a plan (the worker's backfill / reconfigure handlers) import the shim; callers that just want a default (the org-create path) call the provisioner without `retention`. Also addresses two review concerns: - `basinNameForOrg` now throws when the configured prefix + env-name leave zero or negative budget for the org slug. Without the guard a too-long prefix would produce `slice(0, 0) = ""` for every org and silently collide their basins via S2's idempotent-create path. - The plan-code → retention mapping uses an exact-match switch instead of substring matching. Substring matching against future plan codes could grant the wrong retention (e.g. `"approved"` matching `"pro"`). The known set is small and explicit; new plan codes go in the switch at launch. Net surface change: - `streamBasinProvisioner.server.ts`: drops `StreamBasinTier`, `planTierFor`, `retentionFor` exports. Adds `defaultRetention()`. `provisionBasinForOrg` takes `{ retention?: string }` instead of `{ tier?: StreamBasinTier }`. `reconfigureBasinForOrg` takes a retention string instead of a tier. - `streamBasinRetentionByPlan.server.ts` (new): exports `resolveRetentionForOrg(orgId)` and `retentionForPlanCode(code)`. - `commonWorker.server.ts`: handlers call the shim, hand a string to the provisioner. - Admin reconfigure route: replaces the `tier` body field with a direct `retention` duration override. - Org create: no longer passes `tier: "free"`; provisioner uses the default. - New env var `REALTIME_STREAMS_BASIN_DEFAULT_RETENTION` (default `30d`). Existing per-plan vars are still consulted by the shim only. Verified end-to-end with chat.agent locally — fresh chat lands in the per-org basin, multi-turn behaves the same, no leakage to the global fallback.
1 parent 692a257 commit d6d3586

6 files changed

Lines changed: 168 additions & 122 deletions

File tree

apps/webapp/app/env.server.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1517,9 +1517,15 @@ const EnvironmentSchema = z
15171517
/// — kept short to stay under S2's basin-name length limit.
15181518
REALTIME_STREAMS_BASIN_NAME_PREFIX: z.string().default("triggerdotdev"),
15191519
REALTIME_STREAMS_BASIN_NAME_ENV: z.string().default("dev"),
1520-
/// Plan-tier retention strings (S2 duration syntax: 7d / 30d / 1y).
1521-
/// Free / hobby / pro line up with billing tiers; enterprise uses
1522-
/// the pro default and is reconfigured per-contract via the API.
1520+
/// Default retention for new basins (S2 duration syntax: 7d / 30d / 1y).
1521+
/// Used at org-create and as the fallback when no plan-specific
1522+
/// retention is resolved. Operators that don't run a billing API
1523+
/// only need this one.
1524+
REALTIME_STREAMS_BASIN_DEFAULT_RETENTION: z.string().default("30d"),
1525+
/// Plan-specific retention overrides — only consulted by the
1526+
/// optional `streamBasinRetentionByPlan` shim. Operators that
1527+
/// don't map plans to retention (OSS, self-hosted) can ignore
1528+
/// these and rely on the default above.
15231529
REALTIME_STREAMS_BASIN_RETENTION_FREE: z.string().default("7d"),
15241530
REALTIME_STREAMS_BASIN_RETENTION_HOBBY: z.string().default("30d"),
15251531
REALTIME_STREAMS_BASIN_RETENTION_PRO: z.string().default("365d"),

apps/webapp/app/models/organization.server.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,18 @@ export async function createOrganization(
8585
});
8686

8787
// Provision the org's S2 basin synchronously so the very first run
88-
// gets `streamBasinName` stamped via the existing org read. Soft-fail
89-
// on S2 errors so a transient outage doesn't block signup — the
88+
// gets `streamBasinName` stamped via the existing org read. New orgs
89+
// get the default retention; the plan-change path updates retention
90+
// later if the operator runs a billing-aware install. Soft-fail on
91+
// S2 errors so a transient outage doesn't block signup — the
9092
// backfill reconciler picks up any org left with `streamBasinName: null`.
9193
// No-op when `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=false` (OSS mode).
9294
try {
9395
await provisionBasinForOrg({
9496
id: organization.id,
9597
slug: organization.slug,
96-
tier: "free", // new orgs always start on free retention
9798
streamBasinName: organization.streamBasinName,
99+
// No `retention` — provisioner uses `defaultRetention()`.
98100
});
99101
} catch (error) {
100102
logger.warn("[createOrganization] streamBasin provisioning failed; backfill will retry", {

apps/webapp/app/routes/admin.api.v1.stream-basins.reconfigure.ts

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,26 @@ import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
44
import {
55
isPerOrgBasinsEnabled,
66
reconfigureBasinForOrg,
7-
type StreamBasinTier,
87
} from "~/services/realtime/streamBasinProvisioner.server";
98
import { commonWorker } from "~/v3/commonWorker.server";
109

1110
/**
1211
* Admin trigger for `v3.reconfigureStreamBasinForOrg`. The plan-change
13-
* path in `setPlan` already enqueues this automatically in cloud mode;
12+
* path in `setPlan` enqueues this automatically when billing is wired;
1413
* this route exists for ops + e2e testing.
1514
*
1615
* - Default (`{ orgId }`): enqueues the worker job which resolves the
17-
* tier via `getCurrentPlan` and PATCHes the basin to match. No-op
18-
* locally because `getCurrentPlan` is gated to cloud hosts.
19-
* - With `tier`: bypasses the billing lookup and runs reconfigure
20-
* inline against the given tier. Useful for validating the PATCH
21-
* wire shape end-to-end and as a manual override (e.g. enterprise
22-
* contract retention).
16+
* retention from the org's plan and PATCHes the basin to match.
17+
* No-op when billing isn't configured (OSS).
18+
* - With `retention`: bypasses the billing lookup and runs reconfigure
19+
* inline against the given duration string (e.g. `"7d"`, `"30d"`,
20+
* `"365d"`, `"1y"`). Useful for validating the PATCH wire shape
21+
* end-to-end and as a manual override (e.g. enterprise contracts).
2322
*/
2423
const BodySchema = z
2524
.object({
2625
orgId: z.string(),
27-
tier: z.enum(["free", "hobby", "pro"]).optional(),
26+
retention: z.string().optional(),
2827
})
2928
.strict();
3029

@@ -50,13 +49,17 @@ export async function action({ request }: ActionFunctionArgs) {
5049
return json({ ok: false, error: parsed.error.flatten() }, { status: 400 });
5150
}
5251

53-
if (parsed.data.tier) {
54-
// Direct, synchronous reconfigure with the explicit tier override.
55-
// Skips the worker queue + billing lookup so the PATCH is verifiable
56-
// in the response. Errors surface as 500.
57-
const tier: StreamBasinTier = parsed.data.tier;
58-
await reconfigureBasinForOrg(parsed.data.orgId, tier);
59-
return json({ ok: true, mode: "inline", orgId: parsed.data.orgId, tier });
52+
if (parsed.data.retention) {
53+
// Direct, synchronous reconfigure with the explicit retention.
54+
// Skips the worker queue + billing lookup so the PATCH is
55+
// verifiable in the response. Errors surface as 500.
56+
await reconfigureBasinForOrg(parsed.data.orgId, parsed.data.retention);
57+
return json({
58+
ok: true,
59+
mode: "inline",
60+
orgId: parsed.data.orgId,
61+
retention: parsed.data.retention,
62+
});
6063
}
6164

6265
await commonWorker.enqueue({

apps/webapp/app/services/realtime/streamBasinProvisioner.server.ts

Lines changed: 48 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -8,93 +8,80 @@
88
* basin in `REALTIME_STREAMS_S2_BASIN`. `Organization.streamBasinName`
99
* stays null forever; reads / writes resolve to the global basin.
1010
*
11-
* - **Per-org-basin mode** (cloud):
11+
* - **Per-org-basin mode**:
1212
* `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=true`. Each org gets a
13-
* dedicated basin with retention tied to its billing plan. The
14-
* basin is the unit of cost attribution (S2 exposes per-basin
15-
* metrics) and isolation (access tokens scope to one basin).
13+
* dedicated basin with its own retention. The basin is the unit of
14+
* cost attribution (S2 exposes per-basin metrics) and isolation
15+
* (access tokens scope to one basin).
1616
*
17-
* Provisioning is one-shot per org: at creation time (or a one-off
18-
* backfill for existing orgs) we create the basin and stamp
17+
* This module is purely retention-string-driven: callers pass a
18+
* duration like `"30d"` and the provisioner does the S2 round-trip.
19+
* It has no concept of plans / tiers / billing — operators that want
20+
* per-tier retention live one layer up (see
21+
* `streamBasinRetentionByPlan.server.ts`).
22+
*
23+
* Provisioning is one-shot per org: at creation time (or via the
24+
* backfill worker job for existing orgs) we create the basin and stamp
1925
* `Organization.streamBasinName`. New `TaskRun` / `Session` rows then
2026
* piggyback on the existing org read in `triggerTask` / session-create
2127
* paths and copy the value through. Reads use a precedence chain
2228
* (`run.streamBasinName ?? session.streamBasinName ?? globalBasin`).
2329
*
24-
* Plan changes update retention in-place via `reconfigureBasin`. We do
25-
* not move data across basins.
30+
* Plan / retention changes update retention in-place via
31+
* `reconfigureBasin`. We do not move data across basins.
2632
*/
2733
import type { PrismaClientOrTransaction } from "~/db.server";
2834
import { prisma } from "~/db.server";
2935
import { env } from "~/env.server";
3036
import { logger } from "~/services/logger.server";
3137

32-
/**
33-
* Plan-tier shorthand for retention mapping. Callers translate the
34-
* org's billing plan (via `getCurrentPlan`) into one of these and pass
35-
* it to the provisioner. New orgs (no plan yet) and unbilled orgs
36-
* default to `free` so we don't accidentally grant a year of retention
37-
* to a freeloader.
38-
*/
39-
export type StreamBasinTier = "free" | "hobby" | "pro";
40-
41-
export function retentionFor(tier: StreamBasinTier): string {
42-
switch (tier) {
43-
case "pro":
44-
return env.REALTIME_STREAMS_BASIN_RETENTION_PRO;
45-
case "hobby":
46-
return env.REALTIME_STREAMS_BASIN_RETENTION_HOBBY;
47-
case "free":
48-
default:
49-
return env.REALTIME_STREAMS_BASIN_RETENTION_FREE;
50-
}
38+
export function isPerOrgBasinsEnabled(): boolean {
39+
return env.REALTIME_STREAMS_PER_ORG_BASINS_ENABLED === "true";
5140
}
5241

5342
/**
54-
* Permissive plan-name → tier mapping. Billing returns various strings
55-
* over time (`free_connected`, `hobby`, `team_pro`, `enterprise`, etc.)
56-
* — be forgiving but predictable.
43+
* Default retention for new orgs and any caller that doesn't specify
44+
* a value. Configurable via `REALTIME_STREAMS_BASIN_DEFAULT_RETENTION`.
5745
*/
58-
export function planTierFor(planType: string | null | undefined): StreamBasinTier {
59-
if (!planType) return "free";
60-
const normalized = planType.toLowerCase();
61-
if (normalized.includes("pro") || normalized.includes("team") || normalized.includes("enterprise")) {
62-
return "pro";
63-
}
64-
if (normalized.includes("hobby") || normalized.includes("starter")) {
65-
return "hobby";
66-
}
67-
return "free";
68-
}
69-
70-
export function isPerOrgBasinsEnabled(): boolean {
71-
return env.REALTIME_STREAMS_PER_ORG_BASINS_ENABLED === "true";
46+
export function defaultRetention(): string {
47+
return env.REALTIME_STREAMS_BASIN_DEFAULT_RETENTION;
7248
}
7349

7450
/**
75-
* Build the basin name for an org. Format: `{prefix}-{env}-org-{slug}`
76-
* (e.g. `triggerdotdev-prod-org-acme-corp`). The org slug is already
77-
* lowercase-and-hyphenated by `createOrganization`, so it satisfies S2
78-
* basin-name rules without further normalization. We truncate
79-
* defensively to keep total length under 63 chars (a common bucket
80-
* convention; verify against S2 docs before raising).
51+
* Build the basin name for an org. Format: `{prefix}-{env}-org-{slug}`.
52+
* The org slug is already lowercase-and-hyphenated by
53+
* `createOrganization`, so it satisfies S2 basin-name rules without
54+
* further normalization. We truncate defensively to keep total length
55+
* under 63 chars (a common bucket convention; verify against S2 docs
56+
* before raising).
57+
*
58+
* Throws if `REALTIME_STREAMS_BASIN_NAME_PREFIX` +
59+
* `REALTIME_STREAMS_BASIN_NAME_ENV` are configured so long that no
60+
* room remains for the slug — without this guard, `slice(0, 0)` would
61+
* return an empty string and every org would share the same name,
62+
* silently colliding via S2's 409-on-create.
8163
*/
8264
export function basinNameForOrg(org: { slug: string }): string {
8365
const prefix = env.REALTIME_STREAMS_BASIN_NAME_PREFIX;
8466
const envName = env.REALTIME_STREAMS_BASIN_NAME_ENV;
8567
const head = `${prefix}-${envName}-org-`;
8668
const budget = 63 - head.length;
69+
if (budget <= 0) {
70+
throw new Error(
71+
`[streamBasinProvisioner] REALTIME_STREAMS_BASIN_NAME_PREFIX + REALTIME_STREAMS_BASIN_NAME_ENV too long: head="${head}" leaves no room for the org slug (budget=${budget}). Shorten the prefix or env-name values.`
72+
);
73+
}
8774
const slug = org.slug.slice(0, budget);
8875
return `${head}${slug}`;
8976
}
9077

9178
type ProvisionInput = {
9279
id: string;
9380
slug: string;
94-
/// Caller decides the tier. Org-create path passes `"free"` for new
95-
/// orgs; the backfill worker resolves the tier via `getCurrentPlan`
96-
/// before calling. Defaults to `"free"` if omitted.
97-
tier?: StreamBasinTier;
81+
/// Duration string passed straight to S2. Defaults to
82+
/// `defaultRetention()` when omitted. Caller decides; the provisioner
83+
/// has no opinion about what retention is appropriate.
84+
retention?: string;
9885
streamBasinName: string | null | undefined;
9986
};
10087

@@ -109,9 +96,9 @@ type ProvisionResult =
10996
* success) and writes the column.
11097
*
11198
* Failure modes:
112-
* - S2 unreachable / 5xx: throws. Callers in the org-create path
113-
* should swallow + enqueue a retry job so signup never fails on a
114-
* transient S2 outage. The backfill worker retries naturally.
99+
* - S2 unreachable / 5xx / timeout: throws. Callers in the org-create
100+
* path swallow + leave the column null so the backfill worker can
101+
* retry, so signup never fails on a transient S2 outage.
115102
* - Auth misconfig (no token): throws. Should never happen in
116103
* per-org-basins mode but worth surfacing loudly.
117104
*/
@@ -135,7 +122,7 @@ export async function provisionBasinForOrg(
135122
}
136123

137124
const basin = basinNameForOrg(org);
138-
const retention = retentionFor(org.tier ?? "free");
125+
const retention = org.retention ?? defaultRetention();
139126

140127
await s2CreateBasin(basin, {
141128
accessToken,
@@ -159,13 +146,12 @@ export async function provisionBasinForOrg(
159146
}
160147

161148
/**
162-
* Update retention after a plan change. Idempotent. No-op when the
163-
* org has no provisioned basin. Caller resolves the tier and passes
164-
* it in — keeps the provisioner ignorant of billing.
149+
* Update retention in-place. Idempotent. No-op when the org has no
150+
* provisioned basin.
165151
*/
166152
export async function reconfigureBasinForOrg(
167153
orgId: string,
168-
tier: StreamBasinTier
154+
retention: string
169155
): Promise<void> {
170156
if (!isPerOrgBasinsEnabled()) return;
171157

@@ -178,7 +164,6 @@ export async function reconfigureBasinForOrg(
178164
});
179165
if (!org?.streamBasinName) return;
180166

181-
const retention = retentionFor(tier);
182167
await s2ReconfigureBasin(org.streamBasinName, { accessToken, retentionPolicy: retention });
183168

184169
logger.info("[streamBasinProvisioner] reconfigured basin retention", {
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* Cloud-flavored shim that resolves a stream-basin retention duration
3+
* from an org's current billing plan.
4+
*
5+
* Kept deliberately separate from `streamBasinProvisioner.server.ts`
6+
* so the provisioner stays purely retention-string-driven and has no
7+
* coupling to plan vocabulary. This file is the only place in the
8+
* webapp that maps "plan code" → "retention duration".
9+
*
10+
* Operators that don't run a billing API just don't call this — the
11+
* provisioner accepts retention strings directly, and the org-create
12+
* path falls back to `defaultRetention()`.
13+
*/
14+
import { env } from "~/env.server";
15+
import { getCurrentPlan } from "~/services/platform.v3.server";
16+
import { defaultRetention } from "./streamBasinProvisioner.server";
17+
18+
/**
19+
* Resolve the retention duration for an org based on its current plan.
20+
*
21+
* - Returns the configured retention for the plan when the billing
22+
* API has data.
23+
* - Returns `defaultRetention()` when no billing client is configured
24+
* (OSS / non-cloud installs that flipped per-org basins on without
25+
* wiring billing).
26+
* - **Throws** when billing is configured but the call failed, so
27+
* the redis-worker retry kicks in and we don't silently downgrade
28+
* a paid org's retention.
29+
*/
30+
export async function resolveRetentionForOrg(orgId: string): Promise<string> {
31+
const plan = await getCurrentPlan(orgId);
32+
33+
if (plan === undefined) {
34+
// We can't tell from `getCurrentPlan` alone whether the billing
35+
// client isn't configured (OSS) or whether the call failed
36+
// (transient cloud outage). Today we conservatively throw so
37+
// cloud installs retry. OSS installs that hit this path either:
38+
// (a) flipped the per-org-basins flag on without wiring billing
39+
// and should configure `BILLING_API_URL` / `BILLING_API_KEY`,
40+
// or
41+
// (b) shouldn't be calling this at all and should pass an
42+
// explicit retention to the provisioner.
43+
throw new Error(
44+
`[streamBasinRetentionByPlan] billing plan unavailable for org ${orgId}; will retry`
45+
);
46+
}
47+
48+
return retentionForPlanCode(plan.v3Subscription?.plan?.code);
49+
}
50+
51+
/**
52+
* Map a plan code to a retention duration via env-var lookup.
53+
*
54+
* Exact-match against a small known set rather than substring matching,
55+
* since substring matching against future plan codes could grant the
56+
* wrong tier (e.g. `"approved"` would match `"pro"`). Add a new code
57+
* here when launching a new plan.
58+
*/
59+
export function retentionForPlanCode(code: string | null | undefined): string {
60+
if (!code) return defaultRetention();
61+
62+
switch (code) {
63+
case "free":
64+
return env.REALTIME_STREAMS_BASIN_RETENTION_FREE;
65+
case "v3_hobby_1":
66+
return env.REALTIME_STREAMS_BASIN_RETENTION_HOBBY;
67+
case "v3_pro_1":
68+
case "enterprise":
69+
return env.REALTIME_STREAMS_BASIN_RETENTION_PRO;
70+
default:
71+
return defaultRetention();
72+
}
73+
}

0 commit comments

Comments
 (0)