Skip to content

Commit a23501c

Browse files
committed
feat(webapp,run-engine): per-org S2 basin migration (TRI-9073)
Move from a single shared basin (`triggerdotdev-prod-streams`) with a 30d blanket retention to per-org basins with retention tied to the org's billing plan (free 7d / hobby 30d / pro 365d). Fixes TRIGGER-CLOUD-KK noise from S2 deleting streams out from under live chat sessions when the basin retention fires before the session ends, gives per-org cost attribution via S2's basin metrics API, and narrows the blast radius of any leaked scoped token. OSS / s2-lite installs are unaffected: provisioning is gated by `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED` and the read precedence falls back to the global basin env var when an entity has no stamped basin. Schema: nullable `streamBasinName` on Organization / TaskRun / Session (migration `20260504071227_add_stream_basin_name`). Stamped at provisioning / trigger / session-create. Reads resolve via `run.streamBasinName ?? session.streamBasinName ?? legacy basin`. Provisioner: new `streamBasinProvisioner.server.ts` creates basins via S2's `POST /v1/basins`, reconfigures via `PATCH /v1/basins/{name}`, maps plan codes to retention durations. Idempotent on race / pre-existing basin (treats 409 as success). Org create wires it synchronously with soft-fail. Plan changes in `setPlan` enqueue `v3.reconfigureStreamBasinForOrg` next to existing billing-cache invalidations. Worker jobs: `v3.provisionStreamBasinForOrg` (backfill / retry) and `v3.reconfigureStreamBasinForOrg` (plan change) on commonWorker. Read path: `getRealtimeStreamInstance` becomes a factory keyed by `{ run, session }` basin context; stream prefix drops `org/{orgId}` segment for per-org basins (basin already isolates) and keeps it for the legacy basin (orgs share). Access-token cache key includes basin to prevent cross-contamination. Admin routes: POST /admin/api/v1/stream-basins/backfill — fan out provisioning jobs for every org with `streamBasinName: null`. dryRun + limit flags. GET returns progress (`provisioned / totalOrgs`). POST /admin/api/v1/stream-basins/reconfigure — enqueue worker job (queued mode) or run inline with `tier` override (escape hatch). Run-engine: `streamBasinName` added to `TriggerParams` (optional); the V2 trigger path stamps it onto the new TaskRun. No changes to `MinimalAuthenticatedEnvironment` — stamping is a trigger-time concern, not a queue concern. Verified end-to-end with chat.agent on local: backfill creates basins with right retention (7d free), reconfigure flips retention via plan change (30d hobby / 365d pro), chat streams land in the per-org basin, zero leakage to the legacy basin, multi-turn reuses the same in/out stream pair.
1 parent b19cf6d commit a23501c

24 files changed

Lines changed: 837 additions & 53 deletions

apps/webapp/app/env.server.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1506,6 +1506,28 @@ const EnvironmentSchema = z
15061506
REALTIME_STREAMS_S2_FLUSH_INTERVAL_MS: z.coerce.number().int().default(100),
15071507
REALTIME_STREAMS_S2_MAX_RETRIES: z.coerce.number().int().default(10),
15081508
REALTIME_STREAMS_S2_WAIT_SECONDS: z.coerce.number().int().default(60),
1509+
/// Per-org basin migration. When "true", the webapp provisions a
1510+
/// dedicated S2 basin per org with plan-tied retention and stamps
1511+
/// `streamBasinName` on new TaskRun / Session rows. OSS / s2-lite
1512+
/// installs leave this off and keep using the single basin defined
1513+
/// by `REALTIME_STREAMS_S2_BASIN`.
1514+
REALTIME_STREAMS_PER_ORG_BASINS_ENABLED: z.enum(["true", "false"]).default("false"),
1515+
/// Naming pattern for per-org basins: `{prefix}-{env}-org-{slug}`
1516+
/// e.g. `triggerdotdev-prod-org-acme-corp`. Cluster + tier shorthand
1517+
/// — kept short to stay under S2's basin-name length limit.
1518+
REALTIME_STREAMS_BASIN_NAME_PREFIX: z.string().default("triggerdotdev"),
1519+
REALTIME_STREAMS_BASIN_NAME_ENV: z.string().default("dev"),
1520+
/// Plan-tier retention strings (S2 duration syntax: 7d / 30d / 1y).
1521+
/// Free / hobby / pro line up with billing tiers; enterprise uses
1522+
/// the pro default and is reconfigured per-contract via the API.
1523+
REALTIME_STREAMS_BASIN_RETENTION_FREE: z.string().default("7d"),
1524+
REALTIME_STREAMS_BASIN_RETENTION_HOBBY: z.string().default("30d"),
1525+
REALTIME_STREAMS_BASIN_RETENTION_PRO: z.string().default("365d"),
1526+
/// Storage class applied to per-org basins at create time.
1527+
REALTIME_STREAMS_BASIN_STORAGE_CLASS: z.enum(["express", "standard"]).default("express"),
1528+
/// `delete_on_empty_min_age` applied to per-org basins. Streams
1529+
/// that go empty for this long are reaped automatically.
1530+
REALTIME_STREAMS_BASIN_DELETE_ON_EMPTY_MIN_AGE: z.string().default("1h"),
15091531
REALTIME_STREAMS_DEFAULT_VERSION: z.enum(["v1", "v2"]).default("v1"),
15101532
WAIT_UNTIL_TIMEOUT_MS: z.coerce.number().int().default(600_000),
15111533

apps/webapp/app/models/organization.server.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import { env } from "~/env.server";
1414
import { featuresForUrl } from "~/features.server";
1515
import { createApiKeyForEnv, createPkApiKeyForEnv, envSlug } from "./api-key.server";
1616
import { getDefaultEnvironmentConcurrencyLimit } from "~/services/platform.v3.server";
17+
import { logger } from "~/services/logger.server";
18+
import { provisionBasinForOrg } from "~/services/realtime/streamBasinProvisioner.server";
1719
export type { Organization };
1820

1921
const nanoid = customAlphabet("1234567890abcdef", 4);
@@ -82,6 +84,25 @@ export async function createOrganization(
8284
},
8385
});
8486

87+
// Provision the org's S2 basin synchronously so the very first run
88+
// gets `streamBasinName` stamped via the existing org read. Soft-fail
89+
// on S2 errors so a transient outage doesn't block signup — the
90+
// backfill reconciler picks up any org left with `streamBasinName: null`.
91+
// No-op when `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=false` (OSS mode).
92+
try {
93+
await provisionBasinForOrg({
94+
id: organization.id,
95+
slug: organization.slug,
96+
tier: "free", // new orgs always start on free retention
97+
streamBasinName: organization.streamBasinName,
98+
});
99+
} catch (error) {
100+
logger.warn("[createOrganization] streamBasin provisioning failed; backfill will retry", {
101+
orgId: organization.id,
102+
error: error instanceof Error ? error.message : String(error),
103+
});
104+
}
105+
85106
return { ...organization };
86107
}
87108

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
import { json, type ActionFunctionArgs } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { prisma } from "~/db.server";
4+
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
5+
import { isPerOrgBasinsEnabled } from "~/services/realtime/streamBasinProvisioner.server";
6+
import { commonWorker } from "~/v3/commonWorker.server";
7+
import { logger } from "~/services/logger.server";
8+
9+
/**
10+
* One-shot backfill that enqueues `v3.provisionStreamBasinForOrg` for
11+
* every org with `streamBasinName: null`. Idempotent — re-running picks
12+
* up only the orgs that haven't been provisioned yet, and the worker
13+
* job itself is also idempotent (the provisioner short-circuits if the
14+
* org column is already set).
15+
*
16+
* - Admin auth via `requireAdminApiRequest` (PAT in `Authorization`).
17+
* - Refuses to run when `REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=false`
18+
* so OSS / s2-lite installs can't accidentally trigger basin
19+
* creation against a misconfigured backend.
20+
* - `dryRun=true` (default false) returns the count without enqueueing.
21+
* - `limit` (default 1000, max 10000) caps a single invocation. Run
22+
* again to process more — the column filter naturally walks the
23+
* queue forward each call.
24+
* - Each job is keyed `provisionStreamBasin:<orgId>` so concurrent
25+
* backfill calls converge to one job per org instead of duplicating.
26+
*
27+
* Run from a shell:
28+
* curl -X POST -H "Authorization: Bearer $PAT" \
29+
* "https://api.trigger.dev/admin/api/v1/stream-basins/backfill?limit=200&dryRun=true"
30+
*/
31+
32+
const BodySchema = z
33+
.object({
34+
dryRun: z.boolean().optional().default(false),
35+
limit: z.number().int().min(1).max(10_000).optional().default(1000),
36+
})
37+
.strict();
38+
39+
type BackfillResponse = {
40+
ok: true;
41+
dryRun: boolean;
42+
enqueued: number;
43+
pending: number;
44+
remaining: number;
45+
orgIds: string[];
46+
};
47+
48+
export async function action({ request }: ActionFunctionArgs) {
49+
await requireAdminApiRequest(request);
50+
51+
if (!isPerOrgBasinsEnabled()) {
52+
return json(
53+
{
54+
ok: false,
55+
error:
56+
"Per-org stream basins are disabled. Set REALTIME_STREAMS_PER_ORG_BASINS_ENABLED=true before running the backfill.",
57+
},
58+
{ status: 400 }
59+
);
60+
}
61+
62+
// `application/json` POST body — empty body falls back to defaults so
63+
// a parameterless POST does the right thing for the default backfill.
64+
let parsed: z.infer<typeof BodySchema>;
65+
try {
66+
const text = await request.text();
67+
const raw = text.length > 0 ? JSON.parse(text) : {};
68+
const result = BodySchema.safeParse(raw);
69+
if (!result.success) {
70+
return json({ ok: false, error: result.error.flatten() }, { status: 400 });
71+
}
72+
parsed = result.data;
73+
} catch {
74+
return json({ ok: false, error: "Invalid JSON body" }, { status: 400 });
75+
}
76+
77+
const { dryRun, limit } = parsed;
78+
79+
// Page candidate orgs. Ordered by createdAt so re-runs walk the queue
80+
// forward predictably; deletedAt filter avoids resurrecting orgs.
81+
const candidates = await prisma.organization.findMany({
82+
where: {
83+
streamBasinName: null,
84+
deletedAt: null,
85+
},
86+
orderBy: { createdAt: "asc" },
87+
take: limit,
88+
select: { id: true },
89+
});
90+
91+
// Total count of remaining nulls (for progress reporting).
92+
const remainingTotal = await prisma.organization.count({
93+
where: { streamBasinName: null, deletedAt: null },
94+
});
95+
96+
if (dryRun) {
97+
const response: BackfillResponse = {
98+
ok: true,
99+
dryRun: true,
100+
enqueued: 0,
101+
pending: candidates.length,
102+
remaining: Math.max(0, remainingTotal - candidates.length),
103+
orgIds: candidates.map((o) => o.id),
104+
};
105+
return json(response);
106+
}
107+
108+
// Enqueue one job per org. Per-org dedupe key collapses concurrent
109+
// backfill calls into a single pending job, and a job that's already
110+
// run (basin set) is a no-op on the worker side.
111+
let enqueued = 0;
112+
for (const org of candidates) {
113+
try {
114+
await commonWorker.enqueue({
115+
job: "v3.provisionStreamBasinForOrg",
116+
payload: { orgId: org.id },
117+
id: `provisionStreamBasin:${org.id}`,
118+
});
119+
enqueued += 1;
120+
} catch (error) {
121+
logger.error("[stream-basins-backfill] enqueue failed", {
122+
orgId: org.id,
123+
error: error instanceof Error ? error.message : String(error),
124+
});
125+
}
126+
}
127+
128+
const response: BackfillResponse = {
129+
ok: true,
130+
dryRun: false,
131+
enqueued,
132+
pending: candidates.length,
133+
remaining: Math.max(0, remainingTotal - enqueued),
134+
orgIds: candidates.map((o) => o.id),
135+
};
136+
137+
logger.info("[stream-basins-backfill] enqueued provisioning jobs", {
138+
enqueued,
139+
candidates: candidates.length,
140+
remaining: response.remaining,
141+
});
142+
143+
return json(response);
144+
}
145+
146+
// GET returns the current state without doing anything — useful for
147+
// monitoring "is the backfill done yet?" from a dashboard / curl.
148+
export async function loader({ request }: ActionFunctionArgs) {
149+
await requireAdminApiRequest(request);
150+
151+
const totalOrgs = await prisma.organization.count({ where: { deletedAt: null } });
152+
const provisioned = await prisma.organization.count({
153+
where: { deletedAt: null, NOT: { streamBasinName: null } },
154+
});
155+
const remaining = totalOrgs - provisioned;
156+
157+
return json({
158+
ok: true,
159+
perOrgBasinsEnabled: isPerOrgBasinsEnabled(),
160+
totalOrgs,
161+
provisioned,
162+
remaining,
163+
completion: totalOrgs === 0 ? 1 : provisioned / totalOrgs,
164+
});
165+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { json, type ActionFunctionArgs } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
4+
import {
5+
isPerOrgBasinsEnabled,
6+
reconfigureBasinForOrg,
7+
type StreamBasinTier,
8+
} from "~/services/realtime/streamBasinProvisioner.server";
9+
import { commonWorker } from "~/v3/commonWorker.server";
10+
11+
/**
12+
* Admin trigger for `v3.reconfigureStreamBasinForOrg`. The plan-change
13+
* path in `setPlan` already enqueues this automatically in cloud mode;
14+
* this route exists for ops + e2e testing.
15+
*
16+
* - Default (`{ orgId }`): enqueues the worker job which resolves the
17+
* tier via `getCurrentPlan` and PATCHes the basin to match. No-op
18+
* locally because `getCurrentPlan` is gated to cloud hosts.
19+
* - With `tier`: bypasses the billing lookup and runs reconfigure
20+
* inline against the given tier. Useful for validating the PATCH
21+
* wire shape end-to-end and as a manual override (e.g. enterprise
22+
* contract retention).
23+
*/
24+
const BodySchema = z
25+
.object({
26+
orgId: z.string(),
27+
tier: z.enum(["free", "hobby", "pro"]).optional(),
28+
})
29+
.strict();
30+
31+
export async function action({ request }: ActionFunctionArgs) {
32+
await requireAdminApiRequest(request);
33+
34+
if (!isPerOrgBasinsEnabled()) {
35+
return json(
36+
{ ok: false, error: "Per-org stream basins are disabled." },
37+
{ status: 400 }
38+
);
39+
}
40+
41+
const text = await request.text();
42+
const parsed = BodySchema.safeParse(JSON.parse(text));
43+
if (!parsed.success) {
44+
return json({ ok: false, error: parsed.error.flatten() }, { status: 400 });
45+
}
46+
47+
if (parsed.data.tier) {
48+
// Direct, synchronous reconfigure with the explicit tier override.
49+
// Skips the worker queue + billing lookup so the PATCH is verifiable
50+
// in the response. Errors surface as 500.
51+
const tier: StreamBasinTier = parsed.data.tier;
52+
await reconfigureBasinForOrg(parsed.data.orgId, tier);
53+
return json({ ok: true, mode: "inline", orgId: parsed.data.orgId, tier });
54+
}
55+
56+
await commonWorker.enqueue({
57+
job: "v3.reconfigureStreamBasinForOrg",
58+
payload: { orgId: parsed.data.orgId },
59+
id: `reconfigureStreamBasin:${parsed.data.orgId}`,
60+
});
61+
62+
return json({ ok: true, mode: "queued", enqueued: parsed.data.orgId });
63+
}

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const { action, loader } = createActionApiRoute(
4040
id: true,
4141
friendlyId: true,
4242
realtimeStreamsVersion: true,
43+
streamBasinName: true,
4344
},
4445
});
4546

@@ -98,7 +99,8 @@ const { action, loader } = createActionApiRoute(
9899
try {
99100
const realtimeStream = getRealtimeStreamInstance(
100101
authentication.environment,
101-
run.realtimeStreamsVersion
102+
run.realtimeStreamsVersion,
103+
{ run }
102104
);
103105

104106
const records = await realtimeStream.readRecords(

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ const { action, loader } = createActionApiRoute(
128128
// hardcode "v2", so the race-check reader has to match.
129129
// Don't fall through to the run's own `realtimeStreamsVersion`,
130130
// which only describes the run's run-scoped streams.
131-
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");
131+
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
132+
session: maybeSession,
133+
});
132134

133135
if (realtimeStream instanceof S2RealtimeStreams) {
134136
const records = await realtimeStream.readSessionStreamRecords(

apps/webapp/app/routes/api.v1.sessions.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,10 @@ const { action } = createActionApiRoute(
167167
runtimeEnvironmentId: authentication.environment.id,
168168
environmentType: authentication.environment.type,
169169
organizationId: authentication.environment.organizationId,
170+
// Stamp the org's S2 basin so realtime reads on this
171+
// session's `.in/.out` channels resolve without joining
172+
// Organization. Null until per-org basins are provisioned.
173+
streamBasinName: authentication.environment.organization.streamBasinName,
170174
},
171175
update: { triggerConfig: triggerConfigJson },
172176
});
@@ -186,6 +190,10 @@ const { action } = createActionApiRoute(
186190
runtimeEnvironmentId: authentication.environment.id,
187191
environmentType: authentication.environment.type,
188192
organizationId: authentication.environment.organizationId,
193+
// Stamp the org's S2 basin so realtime reads on this
194+
// session's `.in/.out` channels resolve without joining
195+
// Organization. Null until per-org basins are provisioned.
196+
streamBasinName: authentication.environment.organization.streamBasinName,
189197
},
190198
});
191199
}

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ const { action, loader } = createActionApiRoute(
8181
);
8282
}
8383

84-
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");
84+
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
85+
session,
86+
});
8587

8688
if (!(realtimeStream instanceof S2RealtimeStreams)) {
8789
return json(

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ const { action } = createActionApiRoute(
5959
});
6060
}
6161

62-
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");
62+
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
63+
session: maybeSession,
64+
});
6365

6466
if (!(realtimeStream instanceof S2RealtimeStreams)) {
6567
return new Response("Session channels require the S2 realtime backend", {
@@ -122,7 +124,9 @@ const loader = createLoaderApiRoute(
122124
},
123125
},
124126
async ({ params, request, authentication, resource }) => {
125-
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2");
127+
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
128+
session: resource.row,
129+
});
126130

127131
if (!(realtimeStream instanceof S2RealtimeStreams)) {
128132
return new Response("Session channels require the S2 realtime backend", {

0 commit comments

Comments
 (0)