Skip to content

Commit 97eb08e

Browse files
committed
fix(webapp): address review on per-org basin migration
Two correctness fixes flagged in PR review. - Session-stream race-check resolves basin from `{ session }` only. The append-side writer in `realtime.v1.sessions.$session.$io.append.ts` passes only `{ session }`, and `resolveStreamBasin` prefers `run` over `session` when both are present. During the migration window `run.streamBasinName` and `session.streamBasinName` can differ — writes land in the session's basin, so the race-check has to read from the same one or it falls through to the redis path silently. - Backfill admin route now supports cursor pagination via `afterOrgId` + `nextAfterOrgId`, so deployments with more orgs than `limit` (max 10k per call) can actually page through. `remaining` now counts orgs strictly past the cursor returned, matching the dry-run semantics.
1 parent fc88017 commit 97eb08e

2 files changed

Lines changed: 37 additions & 12 deletions

File tree

apps/webapp/app/routes/admin.api.v1.stream-basins.backfill.ts

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ import { logger } from "~/services/logger.server";
1919
* so OSS / s2-lite installs can't accidentally trigger basin
2020
* operations against a misconfigured backend.
2121
* - `dryRun=true` (default false) returns the count without enqueueing.
22-
* - `limit` (default 1000, max 10000) caps a single invocation. Run
23-
* again with the next batch.
22+
* - `limit` (default 1000, max 10000) caps a single invocation. To
23+
* page through more orgs than `limit`, pass `afterOrgId` from the
24+
* previous response's `nextAfterOrgId`.
2425
* - Each job is keyed `reconcileStreamBasin:<orgId>` so concurrent
2526
* calls converge to one job per org.
2627
*/
@@ -29,6 +30,7 @@ const BodySchema = z
2930
.object({
3031
dryRun: z.boolean().optional().default(false),
3132
limit: z.number().int().min(1).max(10_000).optional().default(1000),
33+
afterOrgId: z.string().optional(),
3234
})
3335
.strict();
3436

@@ -39,6 +41,7 @@ type BackfillResponse = {
3941
pending: number;
4042
remaining: number;
4143
orgIds: string[];
44+
nextAfterOrgId: string | null;
4245
};
4346

4447
export async function action({ request }: ActionFunctionArgs) {
@@ -68,31 +71,42 @@ export async function action({ request }: ActionFunctionArgs) {
6871
return json({ ok: false, error: "Invalid JSON body" }, { status: 400 });
6972
}
7073

71-
const { dryRun, limit } = parsed;
74+
const { dryRun, limit, afterOrgId } = parsed;
7275

7376
// Walk every non-deleted org. The reconcile worker is fast for the
7477
// no-op case (free with null column) so enqueueing for all is fine
7578
// — saves us from doing per-org billing lookups here just to filter
76-
// candidates.
79+
// candidates. Cursor on `id` (cuid is sortable) gives stable paging
80+
// across calls; `createdAt` ties get broken by the cursor.
7781
const candidates = await prisma.organization.findMany({
7882
where: { deletedAt: null },
79-
orderBy: { createdAt: "asc" },
83+
orderBy: { id: "asc" },
8084
take: limit,
85+
...(afterOrgId ? { cursor: { id: afterOrgId }, skip: 1 } : {}),
8186
select: { id: true },
8287
});
8388

84-
const totalOrgs = await prisma.organization.count({
85-
where: { deletedAt: null },
86-
});
89+
const lastReturnedId = candidates[candidates.length - 1]?.id;
90+
const nextAfterOrgId = candidates.length === limit && lastReturnedId ? lastReturnedId : null;
91+
92+
// Orgs still beyond the cursor we just returned. On the final page,
93+
// `lastReturnedId` is undefined (empty result) or the response is short
94+
// of `limit`, so this is 0 — exactly what the caller needs to stop.
95+
const remaining = lastReturnedId
96+
? await prisma.organization.count({
97+
where: { deletedAt: null, id: { gt: lastReturnedId } },
98+
})
99+
: 0;
87100

88101
if (dryRun) {
89102
const response: BackfillResponse = {
90103
ok: true,
91104
dryRun: true,
92105
enqueued: 0,
93106
pending: candidates.length,
94-
remaining: Math.max(0, totalOrgs - candidates.length),
107+
remaining,
95108
orgIds: candidates.map((o) => o.id),
109+
nextAfterOrgId,
96110
};
97111
return json(response);
98112
}
@@ -114,13 +128,18 @@ export async function action({ request }: ActionFunctionArgs) {
114128
}
115129
}
116130

131+
// `remaining` counts orgs strictly past the cursor returned to the
132+
// caller. Enqueue failures don't change this — re-running with the
133+
// same `afterOrgId` would page through the same window and the
134+
// per-org idempotency key keeps it safe.
117135
const response: BackfillResponse = {
118136
ok: true,
119137
dryRun: false,
120138
enqueued,
121139
pending: candidates.length,
122-
remaining: Math.max(0, totalOrgs - enqueued),
140+
remaining,
123141
orgIds: candidates.map((o) => o.id),
142+
nextAfterOrgId,
124143
};
125144

126145
logger.info("[stream-basins-backfill] enqueued reconcile jobs", {

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ const { action, loader } = createActionApiRoute(
4747
id: true,
4848
friendlyId: true,
4949
realtimeStreamsVersion: true,
50-
streamBasinName: true,
5150
},
5251
});
5352

@@ -129,8 +128,15 @@ const { action, loader } = createActionApiRoute(
129128
// hardcode "v2", so the race-check reader has to match.
130129
// Don't fall through to the run's own `realtimeStreamsVersion`,
131130
// which only describes the run's run-scoped streams.
131+
//
132+
// Resolve basin from `session` only (not `run`). The append-side
133+
// writer in `realtime.v1.sessions.$session.$io.append.ts` passes
134+
// only `{ session }`, and `resolveStreamBasin` prefers `run` over
135+
// `session` when both are present. During the per-org-basin
136+
// migration window, `run.streamBasinName` and
137+
// `session.streamBasinName` can differ — the writes land in the
138+
// session's basin, so the race-check has to read from the same.
132139
const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
133-
run,
134140
session: maybeSession,
135141
});
136142

0 commit comments

Comments
 (0)