Skip to content

Commit 692a257

Browse files
committed
fix(webapp): address coderabbit review on per-org basin migration
- reconfigure admin route: guard `JSON.parse` with try/catch + empty-body check so a malformed POST returns 400 instead of an unhandled 500 (mirror of the backfill route). - session-streams.wait race-check: select `streamBasinName` on the run and pass `{ run, session }` to `getRealtimeStreamInstance` so the resolver picks up the run's stamped basin when the session row is unavailable. - streamBasinProvisioner: 10s `AbortSignal.timeout()` on both `s2CreateBasin` and `s2ReconfigureBasin` so the synchronous org-create path can't hang signup forever on a slow/unresponsive S2. - commonWorker basin handlers: throw when `getCurrentPlan` returns undefined (billing API failure) so redis-worker retries instead of silently defaulting to "free" tier — a reconfigure landing during a transient billing outage would otherwise clip a pro org's retention from 365d to 7d.
1 parent a1d4564 commit 692a257

4 files changed

Lines changed: 38 additions & 6 deletions

File tree

apps/webapp/app/routes/admin.api.v1.stream-basins.reconfigure.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,14 @@ export async function action({ request }: ActionFunctionArgs) {
3838
);
3939
}
4040

41-
const text = await request.text();
42-
const parsed = BodySchema.safeParse(JSON.parse(text));
41+
let parsed: ReturnType<typeof BodySchema.safeParse>;
42+
try {
43+
const text = await request.text();
44+
const raw = text.length > 0 ? JSON.parse(text) : {};
45+
parsed = BodySchema.safeParse(raw);
46+
} catch {
47+
return json({ ok: false, error: "Invalid JSON body" }, { status: 400 });
48+
}
4349
if (!parsed.success) {
4450
return json({ ok: false, error: parsed.error.flatten() }, { status: 400 });
4551
}

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const { action, loader } = createActionApiRoute(
4747
id: true,
4848
friendlyId: true,
4949
realtimeStreamsVersion: true,
50+
streamBasinName: true,
5051
},
5152
});
5253

@@ -129,6 +130,7 @@ const { action, loader } = createActionApiRoute(
129130
// Don't fall through to the run's own `realtimeStreamsVersion`,
130131
// which only describes the run's run-scoped streams.
131132
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
133+
run,
132134
session: maybeSession,
133135
});
134136

apps/webapp/app/services/realtime/streamBasinProvisioner.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,11 @@ async function s2CreateBasin(name: string, opts: CreateBasinOptions): Promise<vo
219219
};
220220

221221
const res = await fetch(url, {
222+
// 10s upper bound so the synchronous org-create call site can't
223+
// hang signup forever if S2 is slow / unreachable. Soft-fail at the
224+
// caller swallows the resulting `TimeoutError`; the backfill worker
225+
// retries the unprovisioned org later.
226+
signal: AbortSignal.timeout(10_000),
222227
method: "POST",
223228
headers: {
224229
Authorization: `Bearer ${opts.accessToken}`,
@@ -250,6 +255,10 @@ async function s2ReconfigureBasin(name: string, opts: ReconfigureBasinOptions):
250255
};
251256

252257
const res = await fetch(url, {
258+
// Same 10s ceiling as create. The reconfigure path runs from the
259+
// worker, so a timeout here just fails the job and lets redis-worker
260+
// retry naturally.
261+
signal: AbortSignal.timeout(10_000),
253262
method: "PATCH",
254263
headers: {
255264
Authorization: `Bearer ${opts.accessToken}`,

apps/webapp/app/v3/commonWorker.server.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,16 @@ function initializeWorker() {
324324
if (!org) return;
325325

326326
const plan = await getCurrentPlan(payload.orgId);
327+
// `plan === undefined` means the billing API call itself failed
328+
// (or the client isn't configured). Throw so redis-worker retries
329+
// — silently defaulting to free would risk a paid org getting
330+
// provisioned with 7d retention if the backfill happened to land
331+
// during a transient billing outage.
332+
if (plan === undefined) {
333+
throw new Error(
334+
`[provisionStreamBasinForOrg] billing plan unavailable for org ${payload.orgId}; will retry`
335+
);
336+
}
327337
// `plan.code` carries the canonical plan id ("free", "v3_hobby_1",
328338
// "v3_pro_1", "enterprise"). `plan.type` is just the
329339
// billing-shape discriminator ("free" | "paid" | "enterprise")
@@ -334,10 +344,15 @@ function initializeWorker() {
334344
},
335345
"v3.reconfigureStreamBasinForOrg": async ({ payload }) => {
336346
const plan = await getCurrentPlan(payload.orgId);
337-
// `plan.code` carries the canonical plan id ("free", "v3_hobby_1",
338-
// "v3_pro_1", "enterprise"). `plan.type` is just the
339-
// billing-shape discriminator ("free" | "paid" | "enterprise")
340-
// and would lump hobby + pro into one bucket.
347+
// Same guard as provision. A reconfigure that silently resolved
348+
// to "free" would clip a pro org's retention from 365d to 7d
349+
// and prematurely expire history — never acceptable. Throw and
350+
// let the worker retry once billing recovers.
351+
if (plan === undefined) {
352+
throw new Error(
353+
`[reconfigureStreamBasinForOrg] billing plan unavailable for org ${payload.orgId}; will retry`
354+
);
355+
}
341356
const tier = planTierFor(plan?.v3Subscription?.plan?.code);
342357
await reconfigureBasinForOrg(payload.orgId, tier);
343358
},

0 commit comments

Comments
 (0)