diff --git a/README.md b/README.md index 5b278e85..3824bb6c 100644 --- a/README.md +++ b/README.md @@ -170,6 +170,13 @@ wizard alongside all of our other PostHog product data, and this is very powerful. For example: we could show in-product surveys to people who have used the wizard to improve the experience. +When the user authenticates, the wizard also streams live run state — current +phase, task list, planned events — to `POST /api/projects/{id}/wizard_sessions/` +so the PostHog web app can render real-time progress. Updates are debounced +(250ms) with phase changes flushed immediately; failures fall back silently to +the wizard's debug log without disturbing the TUI. Pass `--no-telemetry` (or +set `POSTHOG_WIZARD_NO_TELEMETRY=1`) to disable. + ## Leave rules behind Supporting agent sessions after we leave is important. There are plenty of ways diff --git a/bin.ts b/bin.ts index 080acb5b..60634ee8 100644 --- a/bin.ts +++ b/bin.ts @@ -126,6 +126,12 @@ const cli = yargs(hideBin(process.argv)) 'Email address for signup (used with --signup)\nenv: POSTHOG_WIZARD_EMAIL', type: 'string', }, + 'no-telemetry': { + default: false, + describe: + 'Disable pushing wizard run state to PostHog\nenv: POSTHOG_WIZARD_NO_TELEMETRY', + type: 'boolean', + }, }) .command( ['$0'], @@ -632,7 +638,9 @@ function runWizard( const installDir = (options.installDir as string) || process.cwd(); const { startTUI } = await import('./src/ui/tui/start-tui.js'); - const { buildSession } = await import('./src/lib/wizard-session.js'); + const { buildSession, RunPhase } = await import( + './src/lib/wizard-session.js' + ); const { TaskStreamPush } = await import('./src/lib/task-stream/index.js'); const { FileDestination } = await import( './src/lib/task-stream/destinations/file.js' @@ -641,6 +649,9 @@ function runWizard( './src/lib/task-stream/destinations/posthog.js' ); const { analytics } = await import('./src/utils/analytics.js'); + const { logToFile } = await import('./src/utils/debug.js'); + type AnyDestination = + import('./src/lib/task-stream/types.js').TaskStreamDestination; const tui = startTUI(WIZARD_VERSION, config.flowKey as any); @@ -658,6 +669,7 @@ function runWizard( integration: options.integration as any, benchmark: options.benchmark as boolean | undefined, yaraReport: options.yaraReport as boolean | undefined, + noTelemetry: options.noTelemetry as boolean | undefined, }); session.workflowLabel = config.flowKey; if (options.skillId) { @@ -666,13 +678,53 @@ function runWizard( tui.store.session = session; - // Task stream — pushes state to external consumers on task changes + // Task stream — subscribes to store changes and pushes run state + // to external consumers (file log + PostHog backend). The PostHog + // destination is omitted when --no-telemetry is set so no HTTP + // request is ever issued. + const taskStreamEnabled = !session.noTelemetry; + const destinations: AnyDestination[] = [new FileDestination()]; + if (taskStreamEnabled) { + destinations.push( + new PostHogDestination({ + getCredentials: () => { + const creds = tui.store.session.credentials; + if (!creds) return null; + return { + host: creds.host, + projectId: creds.projectId, + auth: { kind: 'oauth_session', token: creds.accessToken }, + }; + }, + onError: (err) => logToFile('[task-stream-push]', err.message), + }), + ); + } const taskStream = new TaskStreamPush({ store: tui.store, workflowId: config.flowKey, - destinations: [new FileDestination(), new PostHogDestination()], + destinations, + enabled: taskStreamEnabled, }); - tui.store.onTasksChanged = () => void taskStream.push(); + taskStream.attach(); + + // Flush a terminal-phase push on Ctrl-C so the web app sees the + // run ended in error rather than hanging on the last "running" + // snapshot. A second signal during shutdown still exits normally + // because process.exit is the last line. + let signalled = false; + const onSignal = (): void => { + if (signalled) return; + signalled = true; + if (tui.store.session.runPhase === RunPhase.Running) { + tui.store.setRunPhase(RunPhase.Error); + } + void taskStream.shutdown(2000).finally(() => { + process.exit(130); + }); + }; + process.on('SIGINT', onSignal); + process.on('SIGTERM', onSignal); await tui.store.runReadyHooks(); await tui.store.getGate('intro'); @@ -721,10 +773,12 @@ function runWizard( }); try { - await taskStream.dispose(); + await taskStream.shutdown(2000); } catch (error) { analytics.captureException(error as Error); } + process.off('SIGINT', onSignal); + process.off('SIGTERM', onSignal); tui.unmount(); process.exit(0); } catch (err) { @@ -798,6 +852,7 @@ function runWizardCI( projectId: options.projectId as string | undefined, benchmark: options.benchmark as boolean | undefined, yaraReport: options.yaraReport as boolean | undefined, + noTelemetry: options.noTelemetry as boolean | undefined, ...env, }); session.workflowLabel = config.flowKey; diff --git a/src/lib/constants.ts b/src/lib/constants.ts index d48ebb7f..fd663b17 100644 --- a/src/lib/constants.ts +++ b/src/lib/constants.ts @@ -118,15 +118,17 @@ export const WIZARD_PROVISIONING_SCOPES = [ /** * Scopes the wizard requests during the OAuth login flow. Superset of - * `WIZARD_PROVISIONING_SCOPES` with two scopes that only apply to the login + * `WIZARD_PROVISIONING_SCOPES` with scopes that only apply to the login * path and are not in the provisioning allowlist: - * - introspection lets the wizard introspect its own token - * - health_issue:read used by `wizard doctor` + * - introspection lets the wizard introspect its own token + * - health_issue:read used by `wizard doctor` + * - wizard_session:write stream run state to /api/projects/{id}/wizard_sessions/ */ export const WIZARD_OAUTH_SCOPES = [ ...WIZARD_PROVISIONING_SCOPES, 'introspection', 'health_issue:read', + 'wizard_session:write', ] as const; // ── Wizard run / variants ─────────────────────────────────────────── diff --git a/src/lib/task-stream/__tests__/posthog-destination.test.ts b/src/lib/task-stream/__tests__/posthog-destination.test.ts new file mode 100644 index 00000000..0f42df95 --- /dev/null +++ b/src/lib/task-stream/__tests__/posthog-destination.test.ts @@ -0,0 +1,309 @@ +import { + PostHogDestination, + type PostHogCredentials, +} from '../destinations/posthog'; +import { StreamEvent, type TaskStreamUpdate } from '../types'; +import { RunPhase } from '../../wizard-session'; + +const SAMPLE_CREDS: PostHogCredentials = { + host: 'https://us.posthog.com', + projectId: 42, + auth: { kind: 'oauth_session', token: 'pha_abc' }, +}; + +const SAMPLE_PAYLOAD: TaskStreamUpdate = { + session_id: 'onboarding-posthog_integration-2026-05-20T17:00:00Z', + workflow_id: 'onboarding', + skill_id: 'posthog_integration', + started_at: '2026-05-20T17:00:00Z', + run_phase: RunPhase.Running, + tasks: [], + timestamp: '2026-05-20T17:00:01.000Z', +}; + +function makeResponse( + status: number, + init: { body?: string; headers?: Record } = {}, +): Response { + return new Response(init.body ?? '', { + status, + headers: init.headers, + }); +} + +function makeFetch(responses: Array): jest.Mock { + let i = 0; + return jest.fn(() => { + const next = responses[i++]; + if (next instanceof Error) return Promise.reject(next); + if (!next) return Promise.resolve(makeResponse(500)); + return Promise.resolve(next); + }); +} + +describe('PostHogDestination', () => { + it('POSTs to /api/projects/{id}/wizard_sessions/ with Bearer auth', async () => { + const fetchImpl = makeFetch([makeResponse(201)]); + const dest = new PostHogDestination({ + getCredentials: () => SAMPLE_CREDS, + fetchImpl, + }); + + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + + expect(fetchImpl).toHaveBeenCalledTimes(1); + const [url, init] = fetchImpl.mock.calls[0]; + expect(url).toBe('https://us.posthog.com/api/projects/42/wizard_sessions/'); + expect(init.method).toBe('POST'); + expect((init.headers as Record).Authorization).toBe( + 'Bearer pha_abc', + ); + expect((init.headers as Record)['Content-Type']).toBe( + 'application/json', + ); + const body = JSON.parse(init.body as string); + expect(body.session_id).toBe(SAMPLE_PAYLOAD.session_id); + // timestamp is stripped from the wire body + expect(body.timestamp).toBeUndefined(); + }); + + it('no HTTP call when credentials are not yet set', async () => { + const fetchImpl = makeFetch([]); + const dest = new PostHogDestination({ + getCredentials: () => null, + fetchImpl, + }); + + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + expect(fetchImpl).not.toHaveBeenCalled(); + }); + + // ── Spec §9 case 9 ─────────────────────────────────────────────── + + it('401 disables future pushes for the run', async () => { + const fetchImpl = makeFetch([ + makeResponse(401), + makeResponse(201), + makeResponse(201), + ]); + const onError = jest.fn(); + const dest = new PostHogDestination({ + getCredentials: () => SAMPLE_CREDS, + fetchImpl, + onError, + sleep: () => Promise.resolve(), + }); + + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + + expect(fetchImpl).toHaveBeenCalledTimes(1); + expect(onError).toHaveBeenCalledTimes(1); + expect(onError.mock.calls[0][0].message).toContain('401'); + }); + + it('403 disables future pushes for the run', async () => { + const fetchImpl = makeFetch([makeResponse(403), makeResponse(201)]); + const dest = new PostHogDestination({ + getCredentials: () => SAMPLE_CREDS, + fetchImpl, + sleep: () => Promise.resolve(), + }); + + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + + expect(fetchImpl).toHaveBeenCalledTimes(1); + }); + + // ── Spec §9 case 10 ────────────────────────────────────────────── + + it('5xx retries with backoff, gives up after exactly 3 attempts', async () => { + const fetchImpl = makeFetch([ + makeResponse(500), + makeResponse(500), + makeResponse(500), + ]); + const sleep: jest.Mock, [number]> = jest.fn((_ms: number) => + Promise.resolve(), + ); + const onError = jest.fn(); + const dest = new PostHogDestination({ + getCredentials: () => SAMPLE_CREDS, + fetchImpl, + sleep, + onError, + }); + + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + + expect(fetchImpl).toHaveBeenCalledTimes(3); + // Two sleeps between three attempts (500ms, 1000ms). + expect(sleep).toHaveBeenCalledTimes(2); + expect(sleep.mock.calls[0][0]).toBe(500); + expect(sleep.mock.calls[1][0]).toBe(1000); + expect(onError).toHaveBeenCalledTimes(1); + expect(onError.mock.calls[0][0].message).toContain('500'); + }); + + it('network error retries up to 3 attempts', async () => { + const fetchImpl = makeFetch([ + new Error('ECONNREFUSED'), + new Error('ECONNREFUSED'), + new Error('ECONNREFUSED'), + ]); + const sleep: jest.Mock, [number]> = jest.fn((_ms: number) => + Promise.resolve(), + ); + const onError = jest.fn(); + const dest = new PostHogDestination({ + getCredentials: () => SAMPLE_CREDS, + fetchImpl, + sleep, + onError, + }); + + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + + expect(fetchImpl).toHaveBeenCalledTimes(3); + expect(onError).toHaveBeenCalledTimes(1); + }); + + it('5xx succeeds on retry', async () => { + const fetchImpl = makeFetch([makeResponse(503), makeResponse(201)]); + const sleep: jest.Mock, [number]> = jest.fn((_ms: number) => + Promise.resolve(), + ); + const dest = new PostHogDestination({ + getCredentials: () => SAMPLE_CREDS, + fetchImpl, + sleep, + }); + + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + expect(fetchImpl).toHaveBeenCalledTimes(2); + }); + + // ── Spec §9 case 11 ────────────────────────────────────────────── + + it('429 respects Retry-After (seconds), single retry, then gives up', async () => { + const fetchImpl = makeFetch([ + makeResponse(429, { headers: { 'Retry-After': '1' } }), + makeResponse(429, { headers: { 'Retry-After': '1' } }), + ]); + const sleep: jest.Mock, [number]> = jest.fn((_ms: number) => + Promise.resolve(), + ); + const onError = jest.fn(); + const dest = new PostHogDestination({ + getCredentials: () => SAMPLE_CREDS, + fetchImpl, + sleep, + onError, + }); + + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + + expect(fetchImpl).toHaveBeenCalledTimes(2); + expect(sleep).toHaveBeenCalledTimes(1); + expect(sleep.mock.calls[0][0]).toBeGreaterThanOrEqual(1000); + expect(onError).toHaveBeenCalledTimes(1); + expect(onError.mock.calls[0][0].message).toContain('rate limited'); + }); + + it('429 retries successfully on second attempt', async () => { + const fetchImpl = makeFetch([ + makeResponse(429, { headers: { 'Retry-After': '1' } }), + makeResponse(201), + ]); + const sleep: jest.Mock, [number]> = jest.fn((_ms: number) => + Promise.resolve(), + ); + const dest = new PostHogDestination({ + getCredentials: () => SAMPLE_CREDS, + fetchImpl, + sleep, + }); + + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + expect(fetchImpl).toHaveBeenCalledTimes(2); + }); + + // ── Spec §9 case 12 ────────────────────────────────────────────── + + it('400 calls onError, never throws, does not disable', async () => { + const fetchImpl = makeFetch([ + makeResponse(400, { body: 'invalid run_phase' }), + makeResponse(201), + ]); + const onError = jest.fn(); + const dest = new PostHogDestination({ + getCredentials: () => SAMPLE_CREDS, + fetchImpl, + onError, + sleep: () => Promise.resolve(), + }); + + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + expect(onError).toHaveBeenCalledTimes(1); + expect(onError.mock.calls[0][0].message).toContain('400'); + + // Next send proceeds (not disabled). + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + expect(fetchImpl).toHaveBeenCalledTimes(2); + }); + + it('send() never rejects, even when fetch throws repeatedly', async () => { + const fetchImpl = makeFetch([ + new Error('boom'), + new Error('boom'), + new Error('boom'), + ]); + const onError = jest.fn(); + const dest = new PostHogDestination({ + getCredentials: () => SAMPLE_CREDS, + fetchImpl, + sleep: () => Promise.resolve(), + onError, + }); + + await expect( + dest.send(StreamEvent.Update, SAMPLE_PAYLOAD), + ).resolves.toBeUndefined(); + expect(onError).toHaveBeenCalledTimes(1); + }); + + it('host with trailing slash is normalized', async () => { + const fetchImpl = makeFetch([makeResponse(201)]); + const dest = new PostHogDestination({ + getCredentials: () => ({ + ...SAMPLE_CREDS, + host: 'https://us.posthog.com/', + }), + fetchImpl, + }); + + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + expect(fetchImpl.mock.calls[0][0]).toBe( + 'https://us.posthog.com/api/projects/42/wizard_sessions/', + ); + }); + + it('personal_api_key auth sends Bearer header', async () => { + const fetchImpl = makeFetch([makeResponse(201)]); + const dest = new PostHogDestination({ + getCredentials: () => ({ + ...SAMPLE_CREDS, + auth: { kind: 'personal_api_key', key: 'phx_xyz' }, + }), + fetchImpl, + }); + + await dest.send(StreamEvent.Update, SAMPLE_PAYLOAD); + const init = fetchImpl.mock.calls[0][1]; + expect((init.headers as Record).Authorization).toBe( + 'Bearer phx_xyz', + ); + }); +}); diff --git a/src/lib/task-stream/__tests__/task-stream-push.test.ts b/src/lib/task-stream/__tests__/task-stream-push.test.ts index 0ff80a88..67404f2e 100644 --- a/src/lib/task-stream/__tests__/task-stream-push.test.ts +++ b/src/lib/task-stream/__tests__/task-stream-push.test.ts @@ -4,10 +4,15 @@ import type { TaskStreamDestination, TaskStreamUpdate } from '../types'; import type { WizardStore, TaskItem } from '../../../ui/tui/store'; import { RunPhase } from '../../wizard-session'; -// Mocks and stuff - type Listener = () => void; +interface MockStoreState { + runPhase: RunPhase; + skillId: string | null; + tasks: TaskItem[]; + eventPlan: unknown[]; +} + function createMockStore(overrides: Partial = {}) { const listeners: Listener[] = []; const state: MockStoreState = { @@ -20,7 +25,11 @@ function createMockStore(overrides: Partial = {}) { const store = { get session() { - return { runPhase: state.runPhase, skillId: state.skillId }; + return { + runPhase: state.runPhase, + skillId: state.skillId, + outroData: null, + }; }, get tasks() { return state.tasks; @@ -35,13 +44,16 @@ function createMockStore(overrides: Partial = {}) { if (i >= 0) listeners.splice(i, 1); }; }, - // mock setter and getter _emit() { for (const cb of listeners) cb(); }, _set(patch: Partial) { Object.assign(state, patch); }, + _setAndEmit(patch: Partial) { + Object.assign(state, patch); + for (const cb of listeners) cb(); + }, _listenerCount() { return listeners.length; }, @@ -50,16 +62,9 @@ function createMockStore(overrides: Partial = {}) { return store as typeof store & WizardStore; } -interface MockStoreState { - runPhase: RunPhase; - skillId: string | null; - tasks: TaskItem[]; - eventPlan: unknown[]; -} - -function createMockDestination( - name = 'test', -): TaskStreamDestination & { calls: Array<[StreamEvent, TaskStreamUpdate]> } { +function createMockDestination(name = 'test'): TaskStreamDestination & { + calls: Array<[StreamEvent, TaskStreamUpdate]>; +} { const calls: Array<[StreamEvent, TaskStreamUpdate]> = []; return { name, @@ -73,19 +78,29 @@ function createMockDestination( function createPush( store: ReturnType, - dest?: ReturnType, + opts: { + dest?: ReturnType; + enabled?: boolean; + } = {}, ) { - const d = dest ?? createMockDestination(); + const dest = opts.dest ?? createMockDestination(); const push = new TaskStreamPush({ store, workflowId: 'test-workflow', - destinations: [d], + destinations: [dest], + enabled: opts.enabled, }); - return { push, dest: d }; + return { push, dest }; } describe('TaskStreamPush', () => { - describe('Coorect order of events', () => { + afterEach(() => { + jest.useRealTimers(); + }); + + // ── Existing event-sequencing behaviour ──────────────────────── + + describe('event ordering (imperative push)', () => { it('first push sends CREATE', async () => { const store = createMockStore(); const { push, dest } = createPush(store); @@ -115,7 +130,7 @@ describe('TaskStreamPush', () => { const store = createMockStore(); const { push, dest } = createPush(store); - await push.push(); // CREATE + await push.push(); store._set({ runPhase: RunPhase.Completed }); await push.push(); @@ -129,7 +144,7 @@ describe('TaskStreamPush', () => { const store = createMockStore(); const { push, dest } = createPush(store); - await push.push(); // CREATE + await push.push(); store._set({ runPhase: RunPhase.Error }); await push.push(); @@ -160,7 +175,9 @@ describe('TaskStreamPush', () => { const payload = dest.calls[0][1]; expect(payload.workflow_id).toBe('test-workflow'); expect(payload.skill_id).toBe('test-skill'); - expect(payload.session_id).toContain('test-workflow-test-skill-'); + expect(payload.session_id).toMatch( + /^test-workflow-test-skill-\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$/, + ); }); it('includes eventPlan when non-empty', async () => { @@ -181,6 +198,18 @@ describe('TaskStreamPush', () => { expect(dest.calls[0][1].event_plan).toBeUndefined(); }); + + it('populates error when phase is Error', async () => { + const store = createMockStore({ runPhase: RunPhase.Error }); + const { push, dest } = createPush(store); + + await push.push(); + + expect(dest.calls[0][1].error).toEqual({ + type: 'wizard_error', + message: expect.any(String), + }); + }); }); describe('destinations are independent', () => { @@ -189,9 +218,7 @@ describe('TaskStreamPush', () => { const good = createMockDestination('good'); const bad: TaskStreamDestination = { name: 'bad', - send: jest.fn(() => { - return Promise.reject(new Error('network down')); - }), + send: jest.fn(() => Promise.reject(new Error('network down'))), }; const push = new TaskStreamPush({ store, @@ -199,7 +226,6 @@ describe('TaskStreamPush', () => { destinations: [bad, good], }); - // This should still happily complete and not block the wizard await push.push(); expect(bad.send).toHaveBeenCalledTimes(1); @@ -210,9 +236,7 @@ describe('TaskStreamPush', () => { const store = createMockStore(); const bad: TaskStreamDestination = { name: 'bad', - send: jest.fn(() => { - return Promise.reject(new Error('fail')); - }), + send: jest.fn(() => Promise.reject(new Error('fail'))), }; const push = new TaskStreamPush({ store, @@ -224,36 +248,239 @@ describe('TaskStreamPush', () => { }); }); - describe('dispose', () => { - it('sends a final push', async () => { + // ── Spec §9 — 12 required test cases ─────────────────────────── + + describe('spec: deterministic session_id', () => { + it('session_id is locked at construction with second-precision ISO', async () => { + const fixedNow = new Date('2026-05-20T17:00:00.123Z'); + jest.useFakeTimers(); + jest.setSystemTime(fixedNow); + const store = createMockStore(); const { push, dest } = createPush(store); - await push.dispose(); + // Even if real time advances before the first push, session_id + // is fixed. + jest.setSystemTime(new Date('2026-05-20T17:05:00.999Z')); + await push.push(); + jest.setSystemTime(new Date('2026-05-20T17:10:00.000Z')); + await push.push(); + + expect(dest.calls[0][1].session_id).toBe( + 'test-workflow-test-skill-2026-05-20T17:00:00Z', + ); + expect(dest.calls[1][1].session_id).toBe(dest.calls[0][1].session_id); + expect(dest.calls[0][1].started_at).toBe('2026-05-20T17:00:00Z'); + + jest.useRealTimers(); + }); + }); + + describe('spec: enabled=false', () => { + it('attach is a no-op and no destination ever fires', () => { + const store = createMockStore({ runPhase: RunPhase.Running }); + const { push, dest } = createPush(store, { enabled: false }); + + push.attach(); + store._setAndEmit({ runPhase: RunPhase.Running }); + store._setAndEmit({ tasks: [taskItem('build')] }); + store._setAndEmit({ runPhase: RunPhase.Completed }); + + expect(store._listenerCount()).toBe(0); + expect(dest.calls).toHaveLength(0); + }); + + it('shutdown resolves immediately when disabled', async () => { + const store = createMockStore({ runPhase: RunPhase.Completed }); + const { push, dest } = createPush(store, { enabled: false }); + + await push.shutdown(2000); + expect(dest.calls).toHaveLength(0); + }); + }); + + describe('spec: idle phase is skipped', () => { + it('store emit with RunPhase.Idle produces no push', async () => { + const store = createMockStore({ runPhase: RunPhase.Idle }); + const { push, dest } = createPush(store); + push.attach(); + + store._setAndEmit({ runPhase: RunPhase.Idle }); + store._setAndEmit({ tasks: [taskItem('build')] }); + + // Flush any pending microtasks. + await flushMicrotasks(); + expect(dest.calls).toHaveLength(0); + }); + }); + + describe('spec: debounces task updates', () => { + it('five rapid emits in the running phase produce one HTTP call with the latest task list', async () => { + jest.useFakeTimers(); + const store = createMockStore({ runPhase: RunPhase.Running }); + const { push, dest } = createPush(store); + push.attach(); + + // First emit fires immediately (phase transition Idle → Running). + store._setAndEmit({ runPhase: RunPhase.Running }); + await flushMicrotasks(); + expect(dest.calls).toHaveLength(1); + expect(dest.calls[0][0]).toBe(StreamEvent.Create); + // Five rapid task emits within 100ms — none fire synchronously. + for (let i = 1; i <= 5; i++) { + store._setAndEmit({ tasks: tasksUpTo(i) }); + await jest.advanceTimersByTimeAsync(20); + } expect(dest.calls).toHaveLength(1); + + // Advance past the 250ms debounce window — one push with the latest list. + await jest.advanceTimersByTimeAsync(250); + await flushMicrotasks(); + + expect(dest.calls).toHaveLength(2); + expect(dest.calls[1][1].tasks).toHaveLength(5); }); + }); - it('sends COMPLETE when runPhase is completed at dispose time', async () => { - const store = createMockStore(); + describe('spec: phase change bypasses debounce', () => { + it('Running → Completed produces an immediate push', async () => { + jest.useFakeTimers(); + const store = createMockStore({ runPhase: RunPhase.Running }); const { push, dest } = createPush(store); + push.attach(); - await push.push(); - store._set({ runPhase: RunPhase.Completed }); - await push.dispose(); + store._setAndEmit({ runPhase: RunPhase.Running }); + await flushMicrotasks(); + expect(dest.calls).toHaveLength(1); + + // Queue a debounced task update. + store._setAndEmit({ tasks: [taskItem('build')] }); + await flushMicrotasks(); + expect(dest.calls).toHaveLength(1); + // Phase change bypasses debounce — immediate push. + store._setAndEmit({ runPhase: RunPhase.Completed }); + await flushMicrotasks(); + expect(dest.calls).toHaveLength(2); expect(dest.calls[1][0]).toBe(StreamEvent.Complete); }); + }); - it('sends ERROR when runPhase is error at dispose time', async () => { - const store = createMockStore(); + describe('spec: coalesces concurrent emits during in-flight push', () => { + it('emits during a slow flush produce one follow-up push with the latest state', async () => { + const store = createMockStore({ runPhase: RunPhase.Running }); + + let resolveFirst!: () => void; + const dest: TaskStreamDestination & { + calls: Array<[StreamEvent, TaskStreamUpdate]>; + } = { + name: 'slow', + calls: [], + send: jest.fn((event: StreamEvent, payload: TaskStreamUpdate) => { + dest.calls.push([event, payload]); + if (dest.calls.length === 1) { + return new Promise((resolve) => { + resolveFirst = resolve; + }); + } + return Promise.resolve(); + }), + }; + + const push = new TaskStreamPush({ + store, + workflowId: 'w', + destinations: [dest], + }); + push.attach(); + + // Push A — phase transition fires immediately, hangs unresolved. + store._setAndEmit({ runPhase: RunPhase.Running }); + await flushMicrotasks(); + expect(dest.calls).toHaveLength(1); + + // Three emits arrive while push A is in flight — all coalesce. + store._setAndEmit({ tasks: tasksUpTo(1) }); + store._setAndEmit({ tasks: tasksUpTo(2) }); + store._setAndEmit({ tasks: tasksUpTo(3) }); + await flushMicrotasks(); + expect(dest.calls).toHaveLength(1); + + resolveFirst(); + await flushMicrotasks(); + await flushMicrotasks(); + + expect(dest.calls).toHaveLength(2); + expect(dest.calls[1][1].tasks).toHaveLength(3); + }); + }); + + describe('spec: shutdown flushes terminal phase', () => { + it('shutdown awaits one final push when phase is terminal', async () => { + const store = createMockStore({ runPhase: RunPhase.Completed }); const { push, dest } = createPush(store); + push.attach(); - await push.push(); - store._set({ runPhase: RunPhase.Error }); - await push.dispose(); + await push.shutdown(2000); + expect(dest.calls).toHaveLength(1); + expect(dest.calls[0][0]).toBe(StreamEvent.Create); + expect(dest.calls[0][1].run_phase).toBe(RunPhase.Completed); + }); - expect(dest.calls[1][0]).toBe(StreamEvent.Error); + it('shutdown skips final push when phase is not terminal', async () => { + const store = createMockStore({ runPhase: RunPhase.Running }); + const { push, dest } = createPush(store); + push.attach(); + + await push.shutdown(2000); + expect(dest.calls).toHaveLength(0); + }); + }); + + describe('spec: shutdown honours timeout', () => { + it('shutdown returns even when destination hangs forever', async () => { + jest.useFakeTimers(); + const store = createMockStore({ runPhase: RunPhase.Completed }); + const hanging: TaskStreamDestination = { + name: 'hangs', + send: jest.fn(() => new Promise(() => undefined)), + }; + const push = new TaskStreamPush({ + store, + workflowId: 'w', + destinations: [hanging], + }); + + const shutdown = push.shutdown(500); + jest.advanceTimersByTime(500); + await expect(shutdown).resolves.toBeUndefined(); + jest.useRealTimers(); }); }); }); + +// ── Helpers ──────────────────────────────────────────────────────── + +/** + * Yield enough microtask ticks for chained awaits to settle. + * Avoids setImmediate / setTimeout so it works under fake timers. + */ +async function flushMicrotasks(): Promise { + for (let i = 0; i < 20; i++) { + await Promise.resolve(); + } +} + +function taskItem(label: string): TaskItem { + return { + label, + activeForm: label, + status: 'pending' as TaskItem['status'], + done: false, + }; +} + +function tasksUpTo(n: number): TaskItem[] { + return Array.from({ length: n }, (_, i) => taskItem(`task-${i + 1}`)); +} diff --git a/src/lib/task-stream/destinations/posthog.ts b/src/lib/task-stream/destinations/posthog.ts index f9644806..e0bebcec 100644 --- a/src/lib/task-stream/destinations/posthog.ts +++ b/src/lib/task-stream/destinations/posthog.ts @@ -1,14 +1,206 @@ +/** + * PostHog destination — pushes wizard run state to the PostHog backend + * via `POST /api/projects/{project_id}/wizard_sessions/`. + * + * Failure handling is fail-silent: never throws to the caller, never + * writes to stdout/stderr, never blocks the agent. Errors flow through + * the optional `onError` callback for the wizard's debug log. + * + * Retry policy: + * 5xx / network → exponential backoff base 500ms cap 8s, max 3 attempts + * 429 → honour `Retry-After` (seconds), single retry + * 401 / 403 → disable for the rest of the run, no further pushes + * 400 → give up for this push, do not disable + * other 4xx → give up for this push, do not disable + */ + import type { TaskStreamDestination, TaskStreamUpdate, StreamEvent, } from '../types'; +export type PostHogAuth = + | { kind: 'oauth_session'; token: string } + | { kind: 'personal_api_key'; key: string }; + +export interface PostHogCredentials { + host: string; + projectId: number; + auth: PostHogAuth; +} + +export interface PostHogDestinationOptions { + /** + * Lazy credential resolver — called on every send. Returns null + * before authentication has completed; in that case the send is a + * no-op (no HTTP request). + */ + getCredentials: () => PostHogCredentials | null; + /** Receives every error for the wizard's internal logfile. */ + onError?: (err: Error) => void; + /** Override for tests. Defaults to global fetch. */ + fetchImpl?: typeof fetch; + /** Override for tests. Defaults to setTimeout. */ + sleep?: (ms: number) => Promise; +} + +const MAX_ATTEMPTS = 3; +const BASE_BACKOFF_MS = 500; +const MAX_BACKOFF_MS = 8000; +const DEFAULT_RETRY_AFTER_MS = 1000; + +function defaultSleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function parseRetryAfter(value: string | null): number { + if (!value) return DEFAULT_RETRY_AFTER_MS; + const seconds = Number(value); + if (Number.isFinite(seconds) && seconds >= 0) { + return Math.ceil(seconds * 1000); + } + // HTTP-date form — best-effort. + const date = Date.parse(value); + if (Number.isFinite(date)) { + return Math.max(0, date - Date.now()); + } + return DEFAULT_RETRY_AFTER_MS; +} + +function authHeader(auth: PostHogAuth): string { + return auth.kind === 'personal_api_key' + ? `Bearer ${auth.key}` + : `Bearer ${auth.token}`; +} + +/** + * Strip the internal-only `timestamp` field before sending. The + * backend schema in the RFC does not define it. + */ +function toWirePayload( + payload: TaskStreamUpdate, +): Omit { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { timestamp: _unused, ...rest } = payload; + return rest; +} + export class PostHogDestination implements TaskStreamDestination { readonly name = 'posthog'; - send(_event: StreamEvent, _payload: TaskStreamUpdate): Promise { - // TODO: implement when the PostHog API surface is defined. - return Promise.resolve(); + private readonly getCredentials: () => PostHogCredentials | null; + private readonly onError: (err: Error) => void; + private readonly fetchImpl: typeof fetch; + private readonly sleep: (ms: number) => Promise; + + private disabled = false; + + constructor(opts: PostHogDestinationOptions) { + this.getCredentials = opts.getCredentials; + this.onError = opts.onError ?? (() => undefined); + this.fetchImpl = opts.fetchImpl ?? ((...args) => fetch(...args)); + this.sleep = opts.sleep ?? defaultSleep; + } + + async send(_event: StreamEvent, payload: TaskStreamUpdate): Promise { + if (this.disabled) return; + const creds = this.getCredentials(); + if (!creds) return; + + await this.postWithRetry(creds, toWirePayload(payload)); + } + + private buildRequest( + creds: PostHogCredentials, + body: object, + ): { url: string; init: Parameters[1] } { + const url = `${creds.host.replace(/\/$/, '')}/api/projects/${ + creds.projectId + }/wizard_sessions/`; + return { + url, + init: { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: authHeader(creds.auth), + }, + body: JSON.stringify(body), + }, + }; + } + + private async postWithRetry( + creds: PostHogCredentials, + body: object, + ): Promise { + const { url, init } = this.buildRequest(creds, body); + let attempt = 0; + let backoff = BASE_BACKOFF_MS; + let retriedAfter429 = false; + + while (attempt < MAX_ATTEMPTS) { + attempt += 1; + let response: Response; + try { + response = await this.fetchImpl(url, init); + } catch (err) { + if (attempt >= MAX_ATTEMPTS) { + this.onError(err instanceof Error ? err : new Error(String(err))); + return; + } + await this.sleep(backoff); + backoff = Math.min(backoff * 2, MAX_BACKOFF_MS); + continue; + } + + if (response.ok) return; + + const status = response.status; + + if (status === 401 || status === 403) { + this.disabled = true; + this.onError(new Error(`wizard_sessions auth failed: ${status}`)); + return; + } + + if (status === 429) { + if (retriedAfter429) { + this.onError(new Error('wizard_sessions rate limited')); + return; + } + retriedAfter429 = true; + const wait = parseRetryAfter(response.headers.get('Retry-After')); + await this.sleep(wait); + // Don't count this against the 5xx attempt budget. + attempt -= 1; + continue; + } + + if (status >= 500) { + if (attempt >= MAX_ATTEMPTS) { + this.onError(new Error(`wizard_sessions server error: ${status}`)); + return; + } + await this.sleep(backoff); + backoff = Math.min(backoff * 2, MAX_BACKOFF_MS); + continue; + } + + if (status === 400) { + let detail = ''; + try { + detail = await response.text(); + } catch { + // ignore + } + this.onError(new Error(`wizard_sessions bad request (400): ${detail}`)); + return; + } + + this.onError(new Error(`wizard_sessions unexpected status: ${status}`)); + return; + } } } diff --git a/src/lib/task-stream/task-stream-push.ts b/src/lib/task-stream/task-stream-push.ts index 8d5927ab..7749bb39 100644 --- a/src/lib/task-stream/task-stream-push.ts +++ b/src/lib/task-stream/task-stream-push.ts @@ -1,19 +1,38 @@ /** * Task-stream push — subscribes to WizardStore, builds payloads, * and fans out async to all registered destinations. + * + * Behaviour: + * - `attach(store)` subscribe to store changes + * - task updates debounced 250ms (trailing edge) + * - phase transitions flush immediately, bypass debounce + * - RunPhase.Idle skipped (no push) + * - enabled === false attach is a no-op + * - shutdown(timeoutMs) cancel pending, flush terminal phase + * with timeout, never throw + * + * Concurrency: only one fan-out at a time. Emits during an in-flight + * push are coalesced — at most one follow-up push fires with the + * latest state once the current one settles. */ import type { WizardStore, TaskItem } from '../../ui/tui/store'; import { TaskStatus } from '../../ui/wizard-ui'; -import { RunPhase } from '../wizard-session'; +import { RunPhase, OutroKind, type OutroData } from '../wizard-session'; import { type TaskStreamDestination, type TaskStreamUpdate, type StreamTask, + type TaskStreamError, StreamTaskStatus, StreamEvent, } from './types'; +/** Trailing-edge debounce window for non-phase-change emits. */ +const DEBOUNCE_MS = 250; +/** Default shutdown timeout for the final terminal flush. */ +const DEFAULT_SHUTDOWN_TIMEOUT_MS = 2000; + const STATUS_MAP: Record = { [TaskStatus.Pending]: StreamTaskStatus.Pending, [TaskStatus.InProgress]: StreamTaskStatus.InProgress, @@ -28,10 +47,29 @@ function buildTasks(items: TaskItem[]): StreamTask[] { })); } +/** Drop ".SSSZ" → "Z" so session_id segments stay routing-safe. */ +function secondPrecisionIso(d: Date): string { + return d.toISOString().replace(/\.\d{3}Z$/, 'Z'); +} + +function buildError( + phase: RunPhase, + outroData: OutroData | null, +): TaskStreamError | undefined { + if (phase !== RunPhase.Error) return undefined; + if (outroData?.kind === OutroKind.Error) { + const message = outroData.message ?? outroData.body ?? 'Wizard run failed'; + return { type: 'wizard_error', message }; + } + return { type: 'wizard_error', message: 'Wizard run failed' }; +} + export interface TaskStreamPushOptions { store: WizardStore; workflowId: string; destinations: TaskStreamDestination[]; + /** When false, `attach` is a no-op and no destination ever fires. */ + enabled?: boolean; } export class TaskStreamPush { @@ -39,39 +77,171 @@ export class TaskStreamPush { private readonly destinations: TaskStreamDestination[]; private readonly startedAt: string; private readonly workflowId: string; + private readonly sessionId: string; - private sessionId: string | null = null; + private enabled: boolean; private created = false; + private lastPushedPhase: RunPhase | null = null; + + private unsubscribe: (() => void) | null = null; + private debounceTimer: ReturnType | null = null; + private inFlight: Promise | null = null; + private needsAnotherPush = false; + private shuttingDown = false; constructor(opts: TaskStreamPushOptions) { this.store = opts.store; this.workflowId = opts.workflowId; this.destinations = opts.destinations; - this.startedAt = new Date().toISOString(); + this.enabled = opts.enabled ?? true; + this.startedAt = secondPrecisionIso(new Date()); + // skillId may not be set yet — fall back to workflowId so the + // session_id is stable for the whole run regardless of when the + // workflow metadata is populated. + const skillId = this.store.session.skillId ?? this.workflowId; + this.sessionId = `${this.workflowId}-${skillId}-${this.startedAt}`; + } + + /** + * Subscribe to store changes. No-op when `enabled === false`. + * Idempotent — repeat calls are ignored. + */ + attach(store?: WizardStore): void { + if (!this.enabled) return; + if (this.unsubscribe) return; + const target = store ?? this.store; + this.unsubscribe = target.subscribe(() => this.onStoreChange()); } - /** Send a final push. */ - async dispose(): Promise { - await this.push(); + /** Stop subscribing. Does not flush. */ + detach(): void { + if (this.unsubscribe) { + this.unsubscribe(); + this.unsubscribe = null; + } + if (this.debounceTimer) { + clearTimeout(this.debounceTimer); + this.debounceTimer = null; + } } + /** + * Cancel pending debounce, flush one final push if the current + * phase is terminal, and resolve. Never throws. Bounded by + * `timeoutMs` — if a destination hangs, this returns anyway. + */ + async shutdown( + timeoutMs: number = DEFAULT_SHUTDOWN_TIMEOUT_MS, + ): Promise { + this.shuttingDown = true; + if (this.debounceTimer) { + clearTimeout(this.debounceTimer); + this.debounceTimer = null; + } + this.detach(); + if (!this.enabled) return; + + const phase = this.store.session.runPhase; + const isTerminal = phase === RunPhase.Completed || phase === RunPhase.Error; + if (!isTerminal) return; + + const flush = this.flush(); + if (timeoutMs <= 0) return; + await Promise.race([ + flush, + new Promise((resolve) => setTimeout(resolve, timeoutMs)), + ]); + } + + /** + * Imperative push — fires immediately regardless of phase. Kept as + * the building block for both subscription-driven and direct calls. + */ async push(): Promise { - const { session, tasks, eventPlan } = this.store; - const skillId = session.skillId ?? this.workflowId; + await this.flush(); + } + + // ── Internal ──────────────────────────────────────────────────── + + private onStoreChange(): void { + if (!this.enabled || this.shuttingDown) return; + const phase = this.store.session.runPhase; + if (phase === RunPhase.Idle) return; + + // A push is already in flight — coalesce. The in-flight push's + // settle handler will trigger one follow-up with the latest state. + if (this.inFlight) { + if (this.debounceTimer) { + clearTimeout(this.debounceTimer); + this.debounceTimer = null; + } + this.needsAnotherPush = true; + return; + } + + const phaseChanged = phase !== this.lastPushedPhase; + if (phaseChanged) { + // Phase transitions bypass the debounce: the web app needs to + // see Running → Completed as soon as it lands. + if (this.debounceTimer) { + clearTimeout(this.debounceTimer); + this.debounceTimer = null; + } + void this.flush(); + return; + } + + // Task updates can arrive faster than we want to push. Debounce + // them — the last update in a burst wins. + if (this.debounceTimer) return; + this.debounceTimer = setTimeout(() => { + this.debounceTimer = null; + void this.flush(); + }, DEBOUNCE_MS); + } - // Lock session ID on first push so it stays stable - if (!this.sessionId) { - this.sessionId = `${this.workflowId}-${skillId}-${this.startedAt}`; + /** + * Fan out the current state to every destination. Serialized — if + * a flush is already running, mark "needs another" and let the + * in-flight one schedule the follow-up when it settles. + */ + private flush(): Promise { + if (this.inFlight) { + this.needsAnotherPush = true; + return this.inFlight; } + const run = async (): Promise => { + try { + await this.sendOnce(); + } finally { + this.inFlight = null; + if (this.needsAnotherPush) { + this.needsAnotherPush = false; + // Re-enter to push the latest snapshot. + await this.flush(); + } + } + }; + + this.inFlight = run(); + return this.inFlight; + } + + private async sendOnce(): Promise { + const { session, tasks, eventPlan } = this.store; + const skillId = session.skillId ?? this.workflowId; + const phase = session.runPhase; + const payload: TaskStreamUpdate = { session_id: this.sessionId, workflow_id: this.workflowId, skill_id: skillId, started_at: this.startedAt, - run_phase: session.runPhase, + run_phase: phase, tasks: buildTasks(tasks), event_plan: eventPlan.length > 0 ? eventPlan : undefined, + error: buildError(phase, session.outroData), timestamp: new Date().toISOString(), }; @@ -79,17 +249,21 @@ export class TaskStreamPush { if (!this.created) { this.created = true; event = StreamEvent.Create; - } else if (payload.run_phase === RunPhase.Completed) { + } else if (phase === RunPhase.Completed) { event = StreamEvent.Complete; - } else if (payload.run_phase === RunPhase.Error) { + } else if (phase === RunPhase.Error) { event = StreamEvent.Error; } else { event = StreamEvent.Update; } + this.lastPushedPhase = phase; + await Promise.all( - // eslint-disable-next-line @typescript-eslint/no-empty-function - this.destinations.map((d) => d.send(event, payload).catch(() => {})), + this.destinations.map((d) => + // eslint-disable-next-line @typescript-eslint/no-empty-function + d.send(event, payload).catch(() => {}), + ), ); } } diff --git a/src/lib/wizard-session.ts b/src/lib/wizard-session.ts index 4a42905c..5a5bf42f 100644 --- a/src/lib/wizard-session.ts +++ b/src/lib/wizard-session.ts @@ -108,6 +108,7 @@ export interface WizardSession { benchmark: boolean; yaraReport: boolean; projectId?: number; + noTelemetry: boolean; // From detection + screens setupConfirmed: boolean; @@ -193,6 +194,7 @@ export function buildSession(args: { benchmark?: boolean; yaraReport?: boolean; projectId?: string; + noTelemetry?: boolean; }): WizardSession { return { debug: args.debug ?? false, @@ -209,6 +211,7 @@ export function buildSession(args: { benchmark: args.benchmark ?? false, yaraReport: args.yaraReport ?? false, projectId: parseProjectIdArg(args.projectId), + noTelemetry: args.noTelemetry ?? false, setupConfirmed: false, integration: args.integration ?? null,