Skip to content

Commit a38eb4f

Browse files
committed
refactor(webapp): per-org basins for paid orgs only
Free orgs share the global stream basin (the existing legacy fallback path); paid orgs get a dedicated per-org basin with retention tied to their tier. Cleaner story, much smaller S2 footprint, and basin existence becomes a real tier benefit rather than a default for everyone. A single `v3.reconcileStreamBasinForOrg` worker job handles every plan transition idempotently: free → paid: provision a new basin, stamp `Organization.streamBasinName`. paid → paid: reconfigure retention (tier-change). S2 retention only takes effect on new streams, but that's fine — old streams age out on their original retention. paid → free: null `Organization.streamBasinName`. Future runs/sessions for this org route through the shared global basin via the existing read-precedence fallback. The per-org basin lingers; existing streams there respect their original retention until they age out. free → free: no-op. Replaces the previous `provisionStreamBasinForOrg` / `reconfigureStreamBasinForOrg` job pair so callers don't have to choose the right job for the transition. `setPlan` enqueues `reconcile` from all three plan-changed branches; the admin backfill route enqueues `reconcile` for every non-deleted org (idempotent — the worker decides per-org what to do). Org create no longer provisions synchronously — new orgs start free and use the shared basin until their first paid upgrade. Verified locally: backfill correctly deprovisioned 4 free orgs (column nulled, basins left intact) and kept the 1 hobby-tier org's basin. A fresh chat for a free org streams into the shared basin under the legacy prefix `org/{orgId}/env/.../sessions/{chatId}/{io}` with no new streams in the old per-org basin.
1 parent 054d1af commit a38eb4f

6 files changed

Lines changed: 184 additions & 153 deletions

File tree

apps/webapp/app/models/organization.server.ts

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ import { env } from "~/env.server";
1414
import { featuresForUrl } from "~/features.server";
1515
import { createApiKeyForEnv, createPkApiKeyForEnv, envSlug } from "./api-key.server";
1616
import { getDefaultEnvironmentConcurrencyLimit } from "~/services/platform.v3.server";
17-
import { logger } from "~/services/logger.server";
18-
import { provisionBasinForOrg } from "~/services/realtime/streamBasinProvisioner.server";
1917
export type { Organization };
2018

2119
const nanoid = customAlphabet("1234567890abcdef", 4);
@@ -84,25 +82,13 @@ export async function createOrganization(
8482
},
8583
});
8684

87-
// Provision the org's S2 basin synchronously so the very first run
88-
// gets `streamBasinName` stamped via the existing org read. New orgs
89-
// get the default retention; the plan-change path updates retention
90-
// later if the operator runs a billing-aware install. Soft-fail on
91-
// S2 errors so a transient outage doesn't block signup — the
92-
// backfill reconciler picks up any org left with `streamBasinName: null`.
93-
// No-op when `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=false` (OSS mode).
94-
try {
95-
await provisionBasinForOrg({
96-
id: organization.id,
97-
streamBasinName: organization.streamBasinName,
98-
// No `retention` — provisioner uses `defaultRetention()`.
99-
});
100-
} catch (error) {
101-
logger.warn("[createOrganization] streamBasin provisioning failed; backfill will retry", {
102-
orgId: organization.id,
103-
error: error instanceof Error ? error.message : String(error),
104-
});
105-
}
85+
// No basin work at signup. New orgs start on free and share the
86+
// global stream basin via the read-precedence fallback
87+
// (`run.streamBasinName ?? session.streamBasinName ?? legacy basin`).
88+
// When the org upgrades to a paid plan, `setPlan` enqueues
89+
// `v3.reconcileStreamBasinForOrg` which provisions a dedicated
90+
// basin and stamps `streamBasinName`. Free → paid → free
91+
// round-trips are reconciled by the same job.
10692

10793
return { ...organization };
10894
}

apps/webapp/app/routes/admin.api.v1.stream-basins.backfill.ts

Lines changed: 25 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,22 @@ import { commonWorker } from "~/v3/commonWorker.server";
77
import { logger } from "~/services/logger.server";
88

99
/**
10-
* One-shot backfill that enqueues `v3.provisionStreamBasinForOrg` for
11-
* every org with `streamBasinName: null`. Idempotent — re-running picks
12-
* up only the orgs that haven't been provisioned yet, and the worker
13-
* job itself is also idempotent (the provisioner short-circuits if the
14-
* org column is already set).
10+
* One-shot backfill that enqueues `v3.reconcileStreamBasinForOrg` for
11+
* every non-deleted org. The reconciler decides per-org what to do:
12+
* provision a basin for paid orgs that don't have one, reconfigure
13+
* retention for paid orgs whose tier changed, deprovision (null the
14+
* column) for free orgs that were mistakenly provisioned. Idempotent
15+
* — re-running converges to the desired state.
1516
*
1617
* - Admin auth via `requireAdminApiRequest` (PAT in `Authorization`).
1718
* - Refuses to run when `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=false`
1819
* so OSS / s2-lite installs can't accidentally trigger basin
19-
* creation against a misconfigured backend.
20+
* operations against a misconfigured backend.
2021
* - `dryRun=true` (default false) returns the count without enqueueing.
2122
* - `limit` (default 1000, max 10000) caps a single invocation. Run
22-
* again to process more — the column filter naturally walks the
23-
* queue forward each call.
24-
* - Each job is keyed `provisionStreamBasin:<orgId>` so concurrent
25-
* backfill calls converge to one job per org instead of duplicating.
26-
*
27-
* Run from a shell:
28-
* curl -X POST -H "Authorization: Bearer $PAT" \
29-
* "https://api.trigger.dev/admin/api/v1/stream-basins/backfill?limit=200&dryRun=true"
23+
* again with the next batch.
24+
* - Each job is keyed `reconcileStreamBasin:<orgId>` so concurrent
25+
* calls converge to one job per org.
3026
*/
3127

3228
const BodySchema = z
@@ -59,8 +55,6 @@ export async function action({ request }: ActionFunctionArgs) {
5955
);
6056
}
6157

62-
// `application/json` POST body — empty body falls back to defaults so
63-
// a parameterless POST does the right thing for the default backfill.
6458
let parsed: z.infer<typeof BodySchema>;
6559
try {
6660
const text = await request.text();
@@ -76,21 +70,19 @@ export async function action({ request }: ActionFunctionArgs) {
7670

7771
const { dryRun, limit } = parsed;
7872

79-
// Page candidate orgs. Ordered by createdAt so re-runs walk the queue
80-
// forward predictably; deletedAt filter avoids resurrecting orgs.
73+
// Walk every non-deleted org. The reconcile worker is fast for the
74+
// no-op case (free with null column) so enqueueing for all is fine
75+
// — saves us from doing per-org billing lookups here just to filter
76+
// candidates.
8177
const candidates = await prisma.organization.findMany({
82-
where: {
83-
streamBasinName: null,
84-
deletedAt: null,
85-
},
78+
where: { deletedAt: null },
8679
orderBy: { createdAt: "asc" },
8780
take: limit,
8881
select: { id: true },
8982
});
9083

91-
// Total count of remaining nulls (for progress reporting).
92-
const remainingTotal = await prisma.organization.count({
93-
where: { streamBasinName: null, deletedAt: null },
84+
const totalOrgs = await prisma.organization.count({
85+
where: { deletedAt: null },
9486
});
9587

9688
if (dryRun) {
@@ -99,22 +91,19 @@ export async function action({ request }: ActionFunctionArgs) {
9991
dryRun: true,
10092
enqueued: 0,
10193
pending: candidates.length,
102-
remaining: Math.max(0, remainingTotal - candidates.length),
94+
remaining: Math.max(0, totalOrgs - candidates.length),
10395
orgIds: candidates.map((o) => o.id),
10496
};
10597
return json(response);
10698
}
10799

108-
// Enqueue one job per org. Per-org dedupe key collapses concurrent
109-
// backfill calls into a single pending job, and a job that's already
110-
// run (basin set) is a no-op on the worker side.
111100
let enqueued = 0;
112101
for (const org of candidates) {
113102
try {
114103
await commonWorker.enqueue({
115-
job: "v3.provisionStreamBasinForOrg",
104+
job: "v3.reconcileStreamBasinForOrg",
116105
payload: { orgId: org.id },
117-
id: `provisionStreamBasin:${org.id}`,
106+
id: `reconcileStreamBasin:${org.id}`,
118107
});
119108
enqueued += 1;
120109
} catch (error) {
@@ -130,11 +119,11 @@ export async function action({ request }: ActionFunctionArgs) {
130119
dryRun: false,
131120
enqueued,
132121
pending: candidates.length,
133-
remaining: Math.max(0, remainingTotal - enqueued),
122+
remaining: Math.max(0, totalOrgs - enqueued),
134123
orgIds: candidates.map((o) => o.id),
135124
};
136125

137-
logger.info("[stream-basins-backfill] enqueued provisioning jobs", {
126+
logger.info("[stream-basins-backfill] enqueued reconcile jobs", {
138127
enqueued,
139128
candidates: candidates.length,
140129
remaining: response.remaining,
@@ -149,17 +138,15 @@ export async function loader({ request }: ActionFunctionArgs) {
149138
await requireAdminApiRequest(request);
150139

151140
const totalOrgs = await prisma.organization.count({ where: { deletedAt: null } });
152-
const provisioned = await prisma.organization.count({
141+
const withBasin = await prisma.organization.count({
153142
where: { deletedAt: null, NOT: { streamBasinName: null } },
154143
});
155-
const remaining = totalOrgs - provisioned;
156144

157145
return json({
158146
ok: true,
159147
perOrgBasinsEnabled: isPerOrgBasinsEnabled(),
160148
totalOrgs,
161-
provisioned,
162-
remaining,
163-
completion: totalOrgs === 0 ? 1 : provisioned / totalOrgs,
149+
withBasin,
150+
withoutBasin: totalOrgs - withBasin,
164151
});
165152
}

apps/webapp/app/routes/admin.api.v1.stream-basins.reconfigure.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ export async function action({ request }: ActionFunctionArgs) {
6767
}
6868

6969
await commonWorker.enqueue({
70-
job: "v3.reconfigureStreamBasinForOrg",
70+
job: "v3.reconcileStreamBasinForOrg",
7171
payload: { orgId: parsed.data.orgId },
72-
id: `reconfigureStreamBasin:${parsed.data.orgId}`,
72+
id: `reconcileStreamBasin:${parsed.data.orgId}`,
7373
});
7474

7575
return json({ ok: true, mode: "queued", enqueued: parsed.data.orgId });

apps/webapp/app/services/platform.v3.server.ts

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ export async function setPlan(
403403
// Invalidate billing cache since plan changed
404404
opts?.invalidateBillingCache?.(organization.id);
405405
platformCache.entitlement.remove(organization.id).catch(() => {});
406-
await enqueueStreamBasinReconfigure(organization.id);
406+
await enqueueStreamBasinReconcile(organization.id);
407407
return redirect(newProjectPath(organization, "You're on the Free plan."));
408408
} else {
409409
return redirectWithErrorMessage(
@@ -421,40 +421,49 @@ export async function setPlan(
421421
// Invalidate billing cache since subscription changed
422422
opts?.invalidateBillingCache?.(organization.id);
423423
platformCache.entitlement.remove(organization.id).catch(() => {});
424-
await enqueueStreamBasinReconfigure(organization.id);
424+
await enqueueStreamBasinReconcile(organization.id);
425425
return redirectWithSuccessMessage(callerPath, request, "Subscription updated successfully.");
426426
}
427427
case "canceled_subscription": {
428428
// Invalidate billing cache since subscription was canceled
429429
opts?.invalidateBillingCache?.(organization.id);
430430
platformCache.entitlement.remove(organization.id).catch(() => {});
431-
await enqueueStreamBasinReconfigure(organization.id);
431+
await enqueueStreamBasinReconcile(organization.id);
432432
return redirectWithSuccessMessage(callerPath, request, "Subscription canceled.");
433433
}
434434
}
435435
}
436436

437437
/**
438-
* Best-effort enqueue: when an org's plan changes we want the per-org
439-
* S2 basin's retention to follow (free=7d, hobby=30d, pro=365d). The
440-
* worker job is idempotent and a no-op when per-org basins are disabled
441-
* or the org has no basin yet (OSS / pre-backfill). Failures are
442-
* logged but never block the plan change itself — billing has already
443-
* accepted by the time we reach this code.
438+
* Best-effort enqueue: when an org's plan changes we reconcile its
439+
* stream-basin state. The reconciler handles every transition:
440+
*
441+
* free → paid: provision a dedicated basin with the plan's retention.
442+
* paid → paid: reconfigure the existing basin's retention.
443+
* paid → free: null `Organization.streamBasinName`. Future runs/sessions
444+
* flow to the shared global basin; the per-org basin
445+
* lingers until existing streams age out on their original
446+
* retention.
447+
* free → free: no-op.
448+
*
449+
* Idempotent and a no-op when per-org basins are disabled or billing
450+
* isn't configured. Failures are logged but never block the plan
451+
* change itself — billing has already accepted by the time we reach
452+
* this code.
444453
*/
445-
async function enqueueStreamBasinReconfigure(orgId: string) {
454+
async function enqueueStreamBasinReconcile(orgId: string) {
446455
try {
447456
const { commonWorker } = await import("~/v3/commonWorker.server");
448457
await commonWorker.enqueue({
449-
job: "v3.reconfigureStreamBasinForOrg",
458+
job: "v3.reconcileStreamBasinForOrg",
450459
payload: { orgId },
451460
// Per-org dedupe key — concurrent plan changes collapse to one
452-
// pending reconfigure job. The job re-reads the current plan
453-
// when it executes, so the latest tier wins.
454-
id: `reconfigureStreamBasin:${orgId}`,
461+
// pending reconcile job. The job re-reads the current plan when
462+
// it executes, so the latest tier wins.
463+
id: `reconcileStreamBasin:${orgId}`,
455464
});
456465
} catch (error) {
457-
logger.warn("[setPlan] failed to enqueue stream basin reconfigure", {
466+
logger.warn("[setPlan] failed to enqueue stream basin reconcile", {
458467
orgId,
459468
error: error instanceof Error ? error.message : String(error),
460469
});

0 commit comments

Comments
 (0)