diff --git a/graphile/graphile-realtime-subscriptions/__tests__/plugin.test.ts b/graphile/graphile-realtime-subscriptions/__tests__/plugin.test.ts index 2901ee3ce..27c7d13c8 100644 --- a/graphile/graphile-realtime-subscriptions/__tests__/plugin.test.ts +++ b/graphile/graphile-realtime-subscriptions/__tests__/plugin.test.ts @@ -4,11 +4,13 @@ * Covers: * - Table discovery via @realtime smart tag * - Subscription field generation (onXxxChanged) - * - Payload type generation (XxxSubscriptionPayload) + * - Payload type generation (XxxSubscriptionPayload) with rowId and overflow fields * - NOTIFY channel naming (realtime:{schema}.{table}) * - Tables without @realtime tag are excluded * - Empty registry produces no fields * - Multiple realtime tables produce multiple fields + * - NOTIFY payload parsing (TG_OP:id1,id2,... and INVALIDATE) + * - Per-subscriber event throttling with configurable limit */ jest.mock('@pgpmjs/logger', () => ({ @@ -23,6 +25,7 @@ jest.mock('@pgpmjs/logger', () => ({ const mockListen = jest.fn(); const mockConstant = jest.fn((val: any) => `constant(${val})`); const mockObject = jest.fn((obj: any) => obj); +const mockLambda = jest.fn((input: any, fn: Function) => fn(input)); const mockContext = jest.fn(() => ({ get: jest.fn((key: string) => `mock-${key}`), })); @@ -32,6 +35,7 @@ jest.mock('grafast', () => ({ listen: mockListen, object: mockObject, constant: mockConstant, + lambda: mockLambda, })); let capturedFactory: Function | null = null; @@ -47,7 +51,13 @@ jest.mock('graphile-utils', () => ({ gql: jest.fn((strings: TemplateStringsArray) => strings.join('')), })); -import { createRealtimeSubscriptionsPlugin, RealtimeSubscriptionsPlugin } from '../src/plugin'; +import { + createRealtimeSubscriptionsPlugin, + RealtimeSubscriptionsPlugin, + parseNotifyPayload, + EventThrottle, + DEFAULT_OVERFLOW_THRESHOLD, +} from '../src/plugin'; // --- Test helpers --- @@ -93,6 +103,124 @@ function createMockBuild(resources: Record, inflectionOverrides: Re // --- Tests --- +describe('parseNotifyPayload', () => { + it('parses INSERT with single row ID', () => { + const result = parseNotifyPayload('INSERT:abc-123'); + expect(result).toEqual({ + event: 'INSERT', + rowIds: ['abc-123'], + overflow: false, + }); + }); + + it('parses UPDATE with multiple row IDs', () => { + const result = parseNotifyPayload('UPDATE:id1,id2,id3'); + expect(result).toEqual({ + event: 'UPDATE', + rowIds: ['id1', 'id2', 'id3'], + overflow: false, + }); + }); + + it('parses DELETE with single row ID', () => { + const result = parseNotifyPayload('DELETE:uuid-456'); + expect(result).toEqual({ + event: 'DELETE', + rowIds: ['uuid-456'], + overflow: false, + }); + }); + + it('parses INVALIDATE as overflow', () => { + const result = parseNotifyPayload('INVALIDATE'); + expect(result).toEqual({ + event: 'INVALIDATE', + rowIds: [], + overflow: true, + }); + }); + + it('handles payload with no colon as bare event', () => { + const result = parseNotifyPayload('INSERT'); + expect(result).toEqual({ + event: 'INSERT', + rowIds: [], + overflow: false, + }); + }); + + it('handles empty string as UNKNOWN', () => { + const result = parseNotifyPayload(''); + expect(result).toEqual({ + event: 'UNKNOWN', + rowIds: [], + overflow: false, + }); + }); + + it('handles operation with empty ID list', () => { + const result = parseNotifyPayload('INSERT:'); + expect(result).toEqual({ + event: 'INSERT', + rowIds: [], + overflow: false, + }); + }); +}); + +describe('EventThrottle', () => { + it('delivers events under threshold', () => { + const throttle = new EventThrottle(3); + + expect(throttle.check()).toBe('deliver'); + expect(throttle.check()).toBe('deliver'); + expect(throttle.check()).toBe('deliver'); + }); + + it('returns overflow on first event exceeding threshold', () => { + const throttle = new EventThrottle(2); + + expect(throttle.check()).toBe('deliver'); + expect(throttle.check()).toBe('deliver'); + expect(throttle.check()).toBe('overflow'); + }); + + it('returns drop for subsequent events after overflow', () => { + const throttle = new EventThrottle(1); + + expect(throttle.check()).toBe('deliver'); + expect(throttle.check()).toBe('overflow'); + expect(throttle.check()).toBe('drop'); + expect(throttle.check()).toBe('drop'); + }); + + it('resets after 1-second window', () => { + const throttle = new EventThrottle(1); + const originalDateNow = Date.now; + + let currentTime = 1000; + Date.now = () => currentTime; + + try { + expect(throttle.check()).toBe('deliver'); + expect(throttle.check()).toBe('overflow'); + + currentTime += 1000; + + expect(throttle.check()).toBe('deliver'); + expect(throttle.check()).toBe('overflow'); + } finally { + Date.now = originalDateNow; + } + }); +}); + +describe('DEFAULT_OVERFLOW_THRESHOLD', () => { + it('is 50', () => { + expect(DEFAULT_OVERFLOW_THRESHOLD).toBe(50); + }); +}); + describe('createRealtimeSubscriptionsPlugin', () => { beforeEach(() => { jest.clearAllMocks(); @@ -199,7 +327,7 @@ describe('createRealtimeSubscriptionsPlugin', () => { expect(result.typeDefs).toContain('onDocumentsChanged(id: UUID): DocumentsSubscriptionPayload'); }); - it('generates payload type with event and row fields', () => { + it('generates payload type with event, row, rowId, and overflow fields', () => { createRealtimeSubscriptionsPlugin(); const codec = createMockCodec('documents', { realtime: true }); @@ -212,6 +340,8 @@ describe('createRealtimeSubscriptionsPlugin', () => { expect(result.typeDefs).toContain('type DocumentsSubscriptionPayload'); expect(result.typeDefs).toContain('event: String!'); expect(result.typeDefs).toContain('documents: Documents'); + expect(result.typeDefs).toContain('rowId: UUID'); + expect(result.typeDefs).toContain('overflow: Boolean!'); }); it('extends Subscription type', () => { @@ -242,12 +372,10 @@ describe('createRealtimeSubscriptionsPlugin', () => { const result = capturedFactory!(build); - // The subscribePlan should reference the correct topic expect(result.plans).toBeDefined(); expect(result.plans['Subscription']).toBeDefined(); expect(result.plans['Subscription']['onProjectsChanged']).toBeDefined(); - // Invoke subscribePlan to verify it calls constant() with the right channel const mockArgs = { get: jest.fn(() => 'test-id') }; result.plans['Subscription']['onProjectsChanged'].subscribePlan(null, mockArgs); @@ -322,7 +450,7 @@ describe('createRealtimeSubscriptionsPlugin', () => { expect(planResult).toBe(mockEvent); }); - it('generates payload type plans with event and row resolvers', () => { + it('generates payload type plans with event, row, rowId, and overflow resolvers', () => { createRealtimeSubscriptionsPlugin(); const codec = createMockCodec('tasks', { realtime: true }); @@ -337,9 +465,11 @@ describe('createRealtimeSubscriptionsPlugin', () => { expect(payloadPlan).toBeDefined(); expect(typeof payloadPlan.event).toBe('function'); expect(typeof payloadPlan.tasks).toBe('function'); + expect(typeof payloadPlan.rowId).toBe('function'); + expect(typeof payloadPlan.overflow).toBe('function'); }); - it('payload event resolver calls parent.get("event")', () => { + it('payload event resolver reads from parsed field', () => { createRealtimeSubscriptionsPlugin(); const codec = createMockCodec('tasks', { realtime: true }); @@ -348,13 +478,13 @@ describe('createRealtimeSubscriptionsPlugin', () => { }); const result = capturedFactory!(build); - const mockParent = { get: jest.fn(() => 'INSERT') }; + const mockParent = { get: jest.fn(() => ({ event: 'INSERT', rowIds: ['id1'], overflow: false })) }; result.plans['TasksSubscriptionPayload'].event(mockParent); - expect(mockParent.get).toHaveBeenCalledWith('event'); + expect(mockParent.get).toHaveBeenCalledWith('parsed'); }); - it('payload row resolver calls resource.get with subscribed id', () => { + it('payload row resolver uses parsed rowId for full collection mode', () => { createRealtimeSubscriptionsPlugin(); const codec = createMockCodec('tasks', { realtime: true }); @@ -367,11 +497,65 @@ describe('createRealtimeSubscriptionsPlugin', () => { }); const result = capturedFactory!(build); - const mockParent = { get: jest.fn(() => 'test-uuid') }; + const mockParent = { get: jest.fn((key: string) => { + if (key === 'parsed') return { event: 'INSERT', rowIds: ['row-uuid'], overflow: false }; + if (key === 'subscribedId') return null; + return null; + }) }; result.plans['TasksSubscriptionPayload'].tasks(mockParent); + expect(mockParent.get).toHaveBeenCalledWith('parsed'); expect(mockParent.get).toHaveBeenCalledWith('subscribedId'); - expect(mockResource.get).toHaveBeenCalledWith({ id: 'test-uuid' }); + expect(mockResource.get).toHaveBeenCalled(); + }); + + it('payload row resolver prefers subscribedId over parsed rowId', () => { + createRealtimeSubscriptionsPlugin(); + + const codec = createMockCodec('tasks', { realtime: true }); + const mockResource = { + ...createMockResource('tasks', codec), + get: jest.fn(), + }; + const build = createMockBuild({ + tasks: mockResource, + }); + + const result = capturedFactory!(build); + const mockParent = { get: jest.fn((key: string) => { + if (key === 'parsed') return { event: 'UPDATE', rowIds: ['row-uuid'], overflow: false }; + if (key === 'subscribedId') return 'subscribed-uuid'; + return null; + }) }; + + result.plans['TasksSubscriptionPayload'].tasks(mockParent); + expect(mockResource.get).toHaveBeenCalled(); + }); + }); + + describe('overflow threshold configuration', () => { + it('uses default threshold of 50 when not specified', () => { + createRealtimeSubscriptionsPlugin(); + + const codec = createMockCodec('projects', { realtime: true }); + const build = createMockBuild({ + projects: createMockResource('projects', codec), + }); + + const result = capturedFactory!(build); + expect(result.plans).toBeDefined(); + }); + + it('accepts custom overflow threshold', () => { + createRealtimeSubscriptionsPlugin({ overflowThreshold: 10 }); + + const codec = createMockCodec('projects', { realtime: true }); + const build = createMockBuild({ + projects: createMockResource('projects', codec), + }); + + const result = capturedFactory!(build); + expect(result.plans).toBeDefined(); }); }); }); diff --git a/graphile/graphile-realtime-subscriptions/src/plugin.ts b/graphile/graphile-realtime-subscriptions/src/plugin.ts index cd7eebf88..31393feaf 100644 --- a/graphile/graphile-realtime-subscriptions/src/plugin.ts +++ b/graphile/graphile-realtime-subscriptions/src/plugin.ts @@ -5,22 +5,31 @@ * subscription fields (onXxxChanged) that use PostgreSQL LISTEN/NOTIFY * for real-time event delivery. * - * Subscription modes (Phase 3a): + * Subscription modes: * - Single record: onXxxChanged(id: UUID!) — subscribe to changes on one row * - Full collection: onXxxChanged (no args) — subscribe to any change on the table * + * NOTIFY payload format (from emit_change trigger): + * - Normal: "INSERT:uuid1,uuid2,..." or "UPDATE:uuid1" or "DELETE:uuid1" + * - Overflow: "INVALIDATE" (when a single statement affects > 50 rows) + * * Event flow: * 1. A row is inserted/updated/deleted - * 2. The emit_change trigger fires pg_notify('realtime:{schema}.{table}', TG_OP) + * 2. The emit_change trigger fires pg_notify with TG_OP:row_ids or INVALIDATE * 3. PostGraphile's pgSubscriber receives the NOTIFY - * 4. The subscription re-queries the source table with RLS enforced - * 5. The client receives { event, row } where row reflects the current state + * 4. The plugin parses the payload and fetches the specific changed row(s) + * 5. The client receives { event, row, rowId, overflow } + * + * Overflow protection: + * - Database-side: statements affecting > 50 rows send INVALIDATE + * - Plugin-side: per-subscriber throttle (default 50 events/second/table) + * drops individual events and sends a single INVALIDATE when exceeded * * RLS enforcement is automatic — resource.get() queries through the * authenticated user's connection with their JWT role applied. */ -import { context as grafastContext, listen, object, constant } from 'grafast'; +import { context as grafastContext, listen, object, constant, lambda } from 'grafast'; import type { GraphileConfig } from 'graphile-config'; import { extendSchema } from 'graphile-utils'; import { Logger } from '@pgpmjs/logger'; @@ -29,6 +38,9 @@ import type { RealtimeSubscriptionsPluginOptions } from './types'; const log = new Logger('graphile-realtime-subscriptions'); +/** Default overflow threshold: events per second per table per subscriber */ +const DEFAULT_OVERFLOW_THRESHOLD = 50; + interface RealtimeTableInfo { resource: any; typeName: string; @@ -40,6 +52,73 @@ interface RealtimeTableInfo { pgTable: string; } +interface ParsedPayload { + event: string; + rowIds: string[]; + overflow: boolean; +} + +/** + * Parse the NOTIFY payload from emit_change. + * Format: "TG_OP:id1,id2,..." or "INVALIDATE" + */ +function parseNotifyPayload(raw: string): ParsedPayload { + if (raw === 'INVALIDATE') { + return { event: 'INVALIDATE', rowIds: [], overflow: true }; + } + + const colonIdx = raw.indexOf(':'); + if (colonIdx === -1) { + return { event: raw || 'UNKNOWN', rowIds: [], overflow: false }; + } + + const event = raw.substring(0, colonIdx); + const idsPart = raw.substring(colonIdx + 1); + const rowIds = idsPart.length > 0 ? idsPart.split(',') : []; + + return { event, rowIds, overflow: false }; +} + +/** + * Per-subscriber, per-table event rate tracker. + * Counts events in a sliding 1-second window. + */ +class EventThrottle { + private windowStart = 0; + private eventCount = 0; + private overflowSent = false; + + constructor(private readonly threshold: number) {} + + /** + * Record an event and return whether it should be delivered. + * Returns 'deliver' for normal events, 'overflow' when the threshold + * is first exceeded, or 'drop' for subsequent events in the same window. + */ + check(): 'deliver' | 'overflow' | 'drop' { + const now = Date.now(); + + if (now - this.windowStart >= 1000) { + this.windowStart = now; + this.eventCount = 0; + this.overflowSent = false; + } + + this.eventCount++; + + if (this.eventCount <= this.threshold) { + return 'deliver'; + } + + if (!this.overflowSent) { + this.overflowSent = true; + return 'overflow'; + } + + return 'drop'; + } +} + function discoverRealtimeTables(build: any): RealtimeTableInfo[] { const { pgRegistry } = build.input; const resources = pgRegistry.pgResources; @@ -90,10 +169,14 @@ function buildTypeDefs(tables: RealtimeTableInfo[]): string { .map(({ payloadTypeName, typeName, rowFieldName }) => `"""Payload delivered when a ${typeName} row changes."""\n` + `type ${payloadTypeName} {\n` + - ` """The DML operation: INSERT, UPDATE, or DELETE."""\n` + + ` """The DML operation: INSERT, UPDATE, DELETE, or INVALIDATE."""\n` + ` event: String!\n` + - ` """The current state of the row (null for DELETE or if RLS denies access)."""\n` + + ` """The current state of the row (null for DELETE, INVALIDATE, or if RLS denies access)."""\n` + ` ${rowFieldName}: ${typeName}\n` + + ` """The ID of the changed row (null for INVALIDATE)."""\n` + + ` rowId: UUID\n` + + ` """True when too many changes occurred and the client should refetch."""\n` + + ` overflow: Boolean!\n` + `}` ) .join('\n\n'); @@ -101,11 +184,16 @@ function buildTypeDefs(tables: RealtimeTableInfo[]): string { return `extend type Subscription {\n${subscriptionFields}\n}\n\n${payloadTypes}`; } -function buildPlans(tables: RealtimeTableInfo[]): Record { +function buildPlans( + tables: RealtimeTableInfo[], + overflowThreshold: number, +): Record { const subscriptionPlans: Record = {}; const allPlans: Record = {}; for (const { resource, fieldName, payloadTypeName, rowFieldName, notifyChannel } of tables) { + const throttle = new EventThrottle(overflowThreshold); + subscriptionPlans[fieldName] = { subscribePlan(_$root: any, args: any) { const $pgSubscriber = (grafastContext() as any).get('pgSubscriber'); @@ -113,8 +201,28 @@ function buildPlans(tables: RealtimeTableInfo[]): Record { const $id = args.get('id'); return listen($pgSubscriber, $topic, ($payload: any) => { + const $parsed = lambda($payload, (raw: unknown) => { + const parsed = parseNotifyPayload(String(raw)); + + const action = parsed.overflow ? 'deliver' : throttle.check(); + + if (action === 'drop') { + return null; + } + + if (action === 'overflow') { + return { + event: 'INVALIDATE', + rowIds: [], + overflow: true, + }; + } + + return parsed; + }); + return object({ - event: $payload, + parsed: $parsed, subscribedId: $id, }); }); @@ -126,11 +234,36 @@ function buildPlans(tables: RealtimeTableInfo[]): Record { allPlans[payloadTypeName] = { event($parent: any) { - return $parent.get('event'); + const $parsed = $parent.get('parsed'); + return lambda($parsed, (p: unknown) => (p as ParsedPayload | null)?.event ?? 'UNKNOWN'); + }, + rowId($parent: any) { + const $parsed = $parent.get('parsed'); + return lambda($parsed, (p: unknown) => { + const parsed = p as ParsedPayload | null; + if (!parsed || parsed.overflow || parsed.rowIds.length === 0) return null; + return parsed.rowIds[0]; + }); + }, + overflow($parent: any) { + const $parsed = $parent.get('parsed'); + return lambda($parsed, (p: unknown) => (p as ParsedPayload | null)?.overflow ?? false); }, [rowFieldName]($parent: any) { - const $id = $parent.get('subscribedId'); - return resource.get({ id: $id }); + const $parsed = $parent.get('parsed'); + const $subscribedId = $parent.get('subscribedId'); + + const $rowId = lambda( + [$parsed, $subscribedId], + (pair: unknown) => { + const [p, subscribedId] = pair as readonly [ParsedPayload | null, string | null]; + if (subscribedId) return subscribedId; + if (!p || p.overflow || p.rowIds.length === 0) return null; + return p.rowIds[0]; + }, + ); + + return resource.get({ id: $rowId }); }, }; } @@ -140,8 +273,10 @@ function buildPlans(tables: RealtimeTableInfo[]): Record { } export function createRealtimeSubscriptionsPlugin( - _options: RealtimeSubscriptionsPluginOptions = {}, + options: RealtimeSubscriptionsPluginOptions = {}, ): GraphileConfig.Plugin { + const overflowThreshold = options.overflowThreshold ?? DEFAULT_OVERFLOW_THRESHOLD; + return extendSchema( (build) => { const tables = discoverRealtimeTables(build); @@ -152,9 +287,10 @@ export function createRealtimeSubscriptionsPlugin( } log.info(`Generating subscription fields for ${tables.length} realtime table(s)`); + log.info(`Overflow threshold: ${overflowThreshold} events/second/table`); const typeDefs = buildTypeDefs(tables); - const plans = buildPlans(tables); + const plans = buildPlans(tables, overflowThreshold); return { typeDefs, plans }; }, @@ -163,3 +299,6 @@ export function createRealtimeSubscriptionsPlugin( } export { createRealtimeSubscriptionsPlugin as RealtimeSubscriptionsPlugin }; + +// Exported for testing +export { parseNotifyPayload, EventThrottle, DEFAULT_OVERFLOW_THRESHOLD }; diff --git a/graphile/graphile-realtime-subscriptions/src/types.ts b/graphile/graphile-realtime-subscriptions/src/types.ts index 5e19dec5a..480dfc55f 100644 --- a/graphile/graphile-realtime-subscriptions/src/types.ts +++ b/graphile/graphile-realtime-subscriptions/src/types.ts @@ -1,10 +1,14 @@ /** * Configuration options for the Realtime Subscriptions Plugin. - * - * Phase 3a supports single-record and full-collection subscription modes - * with no additional configuration required. Future phases will add - * options for poll intervals, batch sizes, and throttling. */ export interface RealtimeSubscriptionsPluginOptions { - // Reserved for Phase 3b+ + /** + * Maximum number of events per table per second before switching to + * overflow (INVALIDATE) mode. When exceeded, the subscription sends + * a single { event: 'INVALIDATE', overflow: true } instead of + * individual row events, signaling the client to refetch. + * + * Default: 50 + */ + overflowThreshold?: number; }