Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 196 additions & 12 deletions graphile/graphile-realtime-subscriptions/__tests__/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
* Covers:
* - Table discovery via @realtime smart tag
* - Subscription field generation (onXxxChanged)
* - Payload type generation (XxxSubscriptionPayload)
* - Payload type generation (XxxSubscriptionPayload) with rowId and overflow fields
* - NOTIFY channel naming (realtime:{schema}.{table})
* - Tables without @realtime tag are excluded
* - Empty registry produces no fields
* - Multiple realtime tables produce multiple fields
* - NOTIFY payload parsing (TG_OP:id1,id2,... and INVALIDATE)
* - Per-subscriber event throttling with configurable limit
*/

jest.mock('@pgpmjs/logger', () => ({
Expand All @@ -23,6 +25,7 @@ jest.mock('@pgpmjs/logger', () => ({
const mockListen = jest.fn();
const mockConstant = jest.fn((val: any) => `constant(${val})`);
const mockObject = jest.fn((obj: any) => obj);
const mockLambda = jest.fn((input: any, fn: Function) => fn(input));
const mockContext = jest.fn(() => ({
get: jest.fn((key: string) => `mock-${key}`),
}));
Expand All @@ -32,6 +35,7 @@ jest.mock('grafast', () => ({
listen: mockListen,
object: mockObject,
constant: mockConstant,
lambda: mockLambda,
}));

let capturedFactory: Function | null = null;
Expand All @@ -47,7 +51,13 @@ jest.mock('graphile-utils', () => ({
gql: jest.fn((strings: TemplateStringsArray) => strings.join('')),
}));

import { createRealtimeSubscriptionsPlugin, RealtimeSubscriptionsPlugin } from '../src/plugin';
import {
createRealtimeSubscriptionsPlugin,
RealtimeSubscriptionsPlugin,
parseNotifyPayload,
EventThrottle,
DEFAULT_OVERFLOW_THRESHOLD,
} from '../src/plugin';

// --- Test helpers ---

Expand Down Expand Up @@ -93,6 +103,124 @@ function createMockBuild(resources: Record<string, any>, inflectionOverrides: Re

// --- Tests ---

describe('parseNotifyPayload', () => {
it('parses INSERT with single row ID', () => {
const result = parseNotifyPayload('INSERT:abc-123');
expect(result).toEqual({
event: 'INSERT',
rowIds: ['abc-123'],
overflow: false,
});
});

it('parses UPDATE with multiple row IDs', () => {
const result = parseNotifyPayload('UPDATE:id1,id2,id3');
expect(result).toEqual({
event: 'UPDATE',
rowIds: ['id1', 'id2', 'id3'],
overflow: false,
});
});

it('parses DELETE with single row ID', () => {
const result = parseNotifyPayload('DELETE:uuid-456');
expect(result).toEqual({
event: 'DELETE',
rowIds: ['uuid-456'],
overflow: false,
});
});

it('parses INVALIDATE as overflow', () => {
const result = parseNotifyPayload('INVALIDATE');
expect(result).toEqual({
event: 'INVALIDATE',
rowIds: [],
overflow: true,
});
});

it('handles payload with no colon as bare event', () => {
const result = parseNotifyPayload('INSERT');
expect(result).toEqual({
event: 'INSERT',
rowIds: [],
overflow: false,
});
});

it('handles empty string as UNKNOWN', () => {
const result = parseNotifyPayload('');
expect(result).toEqual({
event: 'UNKNOWN',
rowIds: [],
overflow: false,
});
});

it('handles operation with empty ID list', () => {
const result = parseNotifyPayload('INSERT:');
expect(result).toEqual({
event: 'INSERT',
rowIds: [],
overflow: false,
});
});
});

describe('EventThrottle', () => {
it('delivers events under threshold', () => {
const throttle = new EventThrottle(3);

expect(throttle.check()).toBe('deliver');
expect(throttle.check()).toBe('deliver');
expect(throttle.check()).toBe('deliver');
});

it('returns overflow on first event exceeding threshold', () => {
const throttle = new EventThrottle(2);

expect(throttle.check()).toBe('deliver');
expect(throttle.check()).toBe('deliver');
expect(throttle.check()).toBe('overflow');
});

it('returns drop for subsequent events after overflow', () => {
const throttle = new EventThrottle(1);

expect(throttle.check()).toBe('deliver');
expect(throttle.check()).toBe('overflow');
expect(throttle.check()).toBe('drop');
expect(throttle.check()).toBe('drop');
});

it('resets after 1-second window', () => {
const throttle = new EventThrottle(1);
const originalDateNow = Date.now;

let currentTime = 1000;
Date.now = () => currentTime;

try {
expect(throttle.check()).toBe('deliver');
expect(throttle.check()).toBe('overflow');

currentTime += 1000;

expect(throttle.check()).toBe('deliver');
expect(throttle.check()).toBe('overflow');
} finally {
Date.now = originalDateNow;
}
});
});

describe('DEFAULT_OVERFLOW_THRESHOLD', () => {
it('is 50', () => {
expect(DEFAULT_OVERFLOW_THRESHOLD).toBe(50);
});
});

describe('createRealtimeSubscriptionsPlugin', () => {
beforeEach(() => {
jest.clearAllMocks();
Expand Down Expand Up @@ -199,7 +327,7 @@ describe('createRealtimeSubscriptionsPlugin', () => {
expect(result.typeDefs).toContain('onDocumentsChanged(id: UUID): DocumentsSubscriptionPayload');
});

it('generates payload type with event and row fields', () => {
it('generates payload type with event, row, rowId, and overflow fields', () => {
createRealtimeSubscriptionsPlugin();

const codec = createMockCodec('documents', { realtime: true });
Expand All @@ -212,6 +340,8 @@ describe('createRealtimeSubscriptionsPlugin', () => {
expect(result.typeDefs).toContain('type DocumentsSubscriptionPayload');
expect(result.typeDefs).toContain('event: String!');
expect(result.typeDefs).toContain('documents: Documents');
expect(result.typeDefs).toContain('rowId: UUID');
expect(result.typeDefs).toContain('overflow: Boolean!');
});

it('extends Subscription type', () => {
Expand Down Expand Up @@ -242,12 +372,10 @@ describe('createRealtimeSubscriptionsPlugin', () => {

const result = capturedFactory!(build);

// The subscribePlan should reference the correct topic
expect(result.plans).toBeDefined();
expect(result.plans['Subscription']).toBeDefined();
expect(result.plans['Subscription']['onProjectsChanged']).toBeDefined();

// Invoke subscribePlan to verify it calls constant() with the right channel
const mockArgs = { get: jest.fn(() => 'test-id') };
result.plans['Subscription']['onProjectsChanged'].subscribePlan(null, mockArgs);

Expand Down Expand Up @@ -322,7 +450,7 @@ describe('createRealtimeSubscriptionsPlugin', () => {
expect(planResult).toBe(mockEvent);
});

it('generates payload type plans with event and row resolvers', () => {
it('generates payload type plans with event, row, rowId, and overflow resolvers', () => {
createRealtimeSubscriptionsPlugin();

const codec = createMockCodec('tasks', { realtime: true });
Expand All @@ -337,9 +465,11 @@ describe('createRealtimeSubscriptionsPlugin', () => {
expect(payloadPlan).toBeDefined();
expect(typeof payloadPlan.event).toBe('function');
expect(typeof payloadPlan.tasks).toBe('function');
expect(typeof payloadPlan.rowId).toBe('function');
expect(typeof payloadPlan.overflow).toBe('function');
});

it('payload event resolver calls parent.get("event")', () => {
it('payload event resolver reads from parsed field', () => {
createRealtimeSubscriptionsPlugin();

const codec = createMockCodec('tasks', { realtime: true });
Expand All @@ -348,13 +478,13 @@ describe('createRealtimeSubscriptionsPlugin', () => {
});

const result = capturedFactory!(build);
const mockParent = { get: jest.fn(() => 'INSERT') };
const mockParent = { get: jest.fn(() => ({ event: 'INSERT', rowIds: ['id1'], overflow: false })) };

result.plans['TasksSubscriptionPayload'].event(mockParent);
expect(mockParent.get).toHaveBeenCalledWith('event');
expect(mockParent.get).toHaveBeenCalledWith('parsed');
});

it('payload row resolver calls resource.get with subscribed id', () => {
it('payload row resolver uses parsed rowId for full collection mode', () => {
createRealtimeSubscriptionsPlugin();

const codec = createMockCodec('tasks', { realtime: true });
Expand All @@ -367,11 +497,65 @@ describe('createRealtimeSubscriptionsPlugin', () => {
});

const result = capturedFactory!(build);
const mockParent = { get: jest.fn(() => 'test-uuid') };
const mockParent = { get: jest.fn((key: string) => {
if (key === 'parsed') return { event: 'INSERT', rowIds: ['row-uuid'], overflow: false };
if (key === 'subscribedId') return null;
return null;
}) };

result.plans['TasksSubscriptionPayload'].tasks(mockParent);
expect(mockParent.get).toHaveBeenCalledWith('parsed');
expect(mockParent.get).toHaveBeenCalledWith('subscribedId');
expect(mockResource.get).toHaveBeenCalledWith({ id: 'test-uuid' });
expect(mockResource.get).toHaveBeenCalled();
});

it('payload row resolver prefers subscribedId over parsed rowId', () => {
createRealtimeSubscriptionsPlugin();

const codec = createMockCodec('tasks', { realtime: true });
const mockResource = {
...createMockResource('tasks', codec),
get: jest.fn(),
};
const build = createMockBuild({
tasks: mockResource,
});

const result = capturedFactory!(build);
const mockParent = { get: jest.fn((key: string) => {
if (key === 'parsed') return { event: 'UPDATE', rowIds: ['row-uuid'], overflow: false };
if (key === 'subscribedId') return 'subscribed-uuid';
return null;
}) };

result.plans['TasksSubscriptionPayload'].tasks(mockParent);
expect(mockResource.get).toHaveBeenCalled();
});
});

describe('overflow threshold configuration', () => {
it('uses default threshold of 50 when not specified', () => {
createRealtimeSubscriptionsPlugin();

const codec = createMockCodec('projects', { realtime: true });
const build = createMockBuild({
projects: createMockResource('projects', codec),
});

const result = capturedFactory!(build);
expect(result.plans).toBeDefined();
});

it('accepts custom overflow threshold', () => {
createRealtimeSubscriptionsPlugin({ overflowThreshold: 10 });

const codec = createMockCodec('projects', { realtime: true });
const build = createMockBuild({
projects: createMockResource('projects', codec),
});

const result = capturedFactory!(build);
expect(result.plans).toBeDefined();
});
});
});
Loading
Loading