From 23ca87c617fb935ee6285a562891f5abb7e4fa64 Mon Sep 17 00:00:00 2001 From: Simon Kirsten Date: Sun, 3 May 2026 17:09:34 +0200 Subject: [PATCH 1/3] fix: stop leaking PromiseReactions in consumer loops MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Promise.race against a never-settling Promise (Effect#closed, Effect#cancel) attaches a .then reaction to that long-lived promise on every call. Because the promise never settles, those reactions never run and never get GC'd. Each leaked reaction's closure retains the awaiter state, which keeps the resolved {frame, group} alive. Over a multi-minute playback session this accumulates one retained Frame per consumed video frame (~400MB after 9 minutes at 60fps) and grows the PromiseReaction chain to tens of thousands of nodes, which inflates major GC pauses to 100–200ms. That's the cause of the periodic ~700ms stalls. Three changes, no API impact: * @moq/hang Consumer.next() waits via an AbortSignal listener registered with { once: true } and explicitly removed when notify wins, instead of Promise.race([wait, signals.closed]). * @moq/watch video legacy decoder loop drops the outer Promise.race([consumer.next(), effect.cancel]). When the effect closes, effect.cleanup runs consumer.close(), which makes next() return undefined naturally and the loop exits. * @moq/watch video and audio CMAF paths track in-flight groups in a Set. effect.cleanup closes them all, so recvGroup/readFrame terminate without racing against effect.cancel. --- js/hang/src/container/legacy.ts | 27 ++++++++++++++++++--------- js/watch/src/audio/decoder.ts | 11 +++++++++++ js/watch/src/video/decoder.ts | 24 +++++++++++++++++++++--- 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/js/hang/src/container/legacy.ts b/js/hang/src/container/legacy.ts index 68498f88e..8532153c4 100644 --- a/js/hang/src/container/legacy.ts +++ b/js/hang/src/container/legacy.ts @@ -280,15 +280,24 @@ export class Consumer { throw new Error("multiple calls to decode not supported"); } - const wait = new Promise((resolve) => { - this.#notify = resolve; - }).then(() => true); - - if (!(await Promise.race([wait, this.#signals.closed]))) { - this.#notify = undefined; - // Consumer was closed while waiting for a new frame. - return undefined; - } + // Wait for either a new frame (notify) or the consumer to close (abort). + // We listen on the AbortSignal directly and remove the listener when + // notify wins; racing against a never-settling Promise leaks a + // PromiseReaction per call (each one retaining the awaiter's result). + const abort = this.#signals.abort; + if (abort.aborted) return undefined; + + const aborted = await new Promise((resolve) => { + const onAbort = () => resolve(true); + abort.addEventListener("abort", onAbort, { once: true }); + this.#notify = () => { + abort.removeEventListener("abort", onAbort); + resolve(false); + }; + }); + + this.#notify = undefined; + if (aborted) return undefined; } } diff --git a/js/watch/src/audio/decoder.ts b/js/watch/src/audio/decoder.ts index 2facbfafa..b87f3e600 100644 --- a/js/watch/src/audio/decoder.ts +++ b/js/watch/src/audio/decoder.ts @@ -269,6 +269,14 @@ export class Decoder { this.#buffered.update(() => decode); }); + // Track open groups so effect cleanup can close them; otherwise + // in-flight readFrame() awaits would never resolve when sub closes. + const openGroups = new Set(); + effect.cleanup(() => { + for (const group of openGroups) group.close(); + openGroups.clear(); + }); + effect.spawn(async () => { const loaded = await Util.Libav.polyfill(); if (!loaded) return; // cancelled @@ -295,6 +303,8 @@ export class Decoder { const group = await sub.recvGroup(); if (!group) break; + openGroups.add(group); + effect.spawn(async () => { try { for (;;) { @@ -322,6 +332,7 @@ export class Decoder { } } } finally { + openGroups.delete(group); group.close(); } }); diff --git a/js/watch/src/video/decoder.ts b/js/watch/src/video/decoder.ts index fed21f65c..00a65c4fa 100644 --- a/js/watch/src/video/decoder.ts +++ b/js/watch/src/video/decoder.ts @@ -308,7 +308,12 @@ class DecoderTrack { effect.spawn(async () => { for (;;) { - const next = await Promise.race([consumer.next(), effect.cancel]); + // consumer.close() runs from effect.cleanup; that aborts the + // consumer, which makes next() return undefined and we break. + // Don't race against effect.cancel — racing a hot loop against + // a long-lived non-settling promise leaks a PromiseReaction + // per call. + const next = await consumer.next(); if (!next) break; const { frame, group } = next; @@ -376,19 +381,31 @@ class DecoderTrack { this.buffered.update(() => decode); }); + // Track open groups so we can close them on cleanup. Closing the parent + // sub doesn't necessarily cascade to in-flight GroupConsumers, and we + // want recvGroup/readFrame to terminate naturally instead of racing + // against effect.cancel (which leaks PromiseReactions per call). + const openGroups = new Set(); + effect.cleanup(() => { + for (const group of openGroups) group.close(); + openGroups.clear(); + }); + effect.spawn(async () => { // Process data segments // TODO: Use a consumer wrapper for CMAF to support latency control for (;;) { - const group = await Promise.race([sub.recvGroup(), effect.cancel]); + const group = await sub.recvGroup(); if (!group) break; + openGroups.add(group); + effect.spawn(async () => { let previous: Time.Micro | undefined; try { for (;;) { - const segment = await Promise.race([group.readFrame(), effect.cancel]); + const segment = await group.readFrame(); if (!segment) break; const samples = Container.Cmaf.decodeDataSegment(segment, timescale); @@ -422,6 +439,7 @@ class DecoderTrack { } } } finally { + openGroups.delete(group); group.close(); } }); From 6e1a72fdc64bebdbd949bcbd08c07707dff4d5f5 Mon Sep 17 00:00:00 2001 From: Simon Kirsten Date: Sun, 3 May 2026 17:14:04 +0200 Subject: [PATCH 2/3] refactor(watch): consolidate video pacing into the renderer's rAF MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, frame pacing happened in the decoder via Sync.wait(), which slept on a setTimeout / #update race per in-flight frame to wake near vsync. The renderer then ran a single-shot rAF on every frame signal update to paint. Two pacing layers, two places that could stall, and systematic beating when the display refresh and content fps line up (the wait resolves a fraction of a ms past vsync, the signal-driven rAF schedules against the next vsync, so you display every other frame). Move pacing to the renderer: - DecoderTrack queues decoded frames immediately into #queue: VideoFrame[]. The output callback no longer awaits anything. - Decoder.consume() returns the newest queued frame whose PTS is ≤ sync.now(), closing any older entries; caller takes ownership. Late-frame drop and decode-buffer trim move here. - The renderer's rAF calls decoder.consume() each vsync. - Drop Sync.wait() and the #update notification machinery — buffer changes now take effect on the next vsync (≤16ms), which is fine for live media. Drop the "first-paint preview" path and Decoder.frame mirror that fed it. The preview was painting decoder.frame.peek() while the buffer filled, which saved 20–100ms of black canvas at first paint but added a special case to the render loop and held a cloned VideoFrame resident at all times for every active track. Replace with two narrower signals: DecoderTrack.display, set once from the first decoded frame's displayWidth/displayHeight as a catalog-display fallback; and #runBuffering now reacts to consume-time `timestamp` instead of decode-time frame presence. Net: one rAF loop in the system. Pacing happens at paint time, where vsync is. Stalls show up as held frames instead of dropped frames. --- js/watch/src/sync.ts | 46 +---------- js/watch/src/video/decoder.ts | 141 ++++++++++++++++++--------------- js/watch/src/video/renderer.ts | 38 ++++----- 3 files changed, 93 insertions(+), 132 deletions(-) diff --git a/js/watch/src/sync.ts b/js/watch/src/sync.ts index 4980c117f..cb0e99d1a 100644 --- a/js/watch/src/sync.ts +++ b/js/watch/src/sync.ts @@ -36,10 +36,6 @@ export class Sync { #buffer = new Signal(Time.Milli.zero); readonly buffer: Signal = this.#buffer; - // A ghetto way to learn when the reference/buffer changes. - // There's probably a way to use Effect, but lets keep it simple for now. - #update: PromiseWithResolvers; - // The media timestamp of the most recently received frame. readonly timestamp = new Signal(undefined); @@ -62,8 +58,6 @@ export class Sync { this.audio = Signal.from(props?.audio); this.video = Signal.from(props?.video); - this.#update = Promise.withResolvers(); - this.signals.run(this.#runJitter.bind(this)); this.signals.run(this.#runBuffer.bind(this)); } @@ -105,9 +99,6 @@ export class Sync { const buffer = Time.Milli.add(Time.Milli.max(video, audio), jitter); this.#buffer.set(buffer); - - this.#update.resolve(); - this.#update = Promise.withResolvers(); } // Update the reference if this is the earliest frame we've seen, relative to its timestamp. @@ -118,9 +109,9 @@ export class Sync { const currentRef = this.#reference.peek(); if (currentRef !== undefined) { - // Check if `wait()` would not sleep at all. - // NOTE: We check here instead of in `wait()` so we can identify when frames are received late. - // Otherwise, chained `wait()` calls would cause a false-positive during CPU starvation. + // Detect frames received later than their scheduled play time, so we can + // distinguish "the network is dropping us behind" from CPU starvation in + // downstream pacing. const sleep = Time.Milli.add(Time.Milli.sub(currentRef, ref), this.#buffer.peek()); if (sleep < 0) { const entry = this.#late.get(label); @@ -147,8 +138,6 @@ export class Sync { } this.#reference.set(ref); - this.#update.resolve(); - this.#update = Promise.withResolvers(); } // The PTS that should be rendering right now, derived from the reference + buffer. @@ -159,35 +148,6 @@ export class Sync { return Time.Milli.sub(Time.Milli.sub(Time.Milli.now(), reference), this.#buffer.peek()); } - // Sleep until it's time to render this frame. - async wait(timestamp: Time.Milli): Promise { - const reference = this.#reference.peek(); - if (reference === undefined) { - throw new Error("reference not set; call update() first"); - } - - for (;;) { - // Sleep until it's time to decode the next frame. - // NOTE: This function runs in parallel for each frame. - const now = Time.Milli.now(); - const ref = Time.Milli.sub(now, timestamp); - - const currentRef = this.#reference.peek(); - if (currentRef === undefined) return; - - const sleep = Time.Milli.add(Time.Milli.sub(currentRef, ref), this.#buffer.peek()); - if (sleep <= 0) return; - - // Skip setTimeout for small sleeps; the timer resolution (~4ms) would overshoot. - if (sleep < 5) return; - - const wait = new Promise((resolve) => setTimeout(resolve, sleep)).then(() => true); - - const ok = await Promise.race([this.#update.promise, wait]); - if (ok) return; - } - } - static #formatDuration(ms: number): string { ms = Math.round(ms); if (ms < 1000) return `${ms}ms`; diff --git a/js/watch/src/video/decoder.ts b/js/watch/src/video/decoder.ts index 00a65c4fa..33fb3306f 100644 --- a/js/watch/src/video/decoder.ts +++ b/js/watch/src/video/decoder.ts @@ -28,11 +28,7 @@ export class Decoder implements Backend { // The current track running, held so we can cancel it when the new track is ready. #active = new Signal(undefined); - // Expose the current frame to render as a signal - #frame = new Signal(undefined); - readonly frame: Getter = this.#frame; - - // The timestamp of the current frame. + // The timestamp of the most recently consumed frame. #timestamp = new Signal(undefined); readonly timestamp: Getter = this.#timestamp; @@ -52,6 +48,19 @@ export class Decoder implements Backend { #signals = new Effect(); + // Pop the newest decoded frame whose PTS is at or before sync.now(), closing + // any older queued frames. The caller takes ownership of the returned frame + // and is responsible for closing it. Returns undefined if no frame is ready. + consume(): VideoFrame | undefined { + const active = this.#active.peek(); + if (!active) return undefined; + + const now = this.source.sync.now(); + if (now === undefined) return undefined; + + return active.consume(now); + } + constructor(source: Source, props?: DecoderProps) { this.enabled = Signal.from(props?.enabled ?? false); @@ -121,15 +130,6 @@ export class Decoder implements Backend { effect.cleanup(() => active.close()); - // Clone the frame so we own it independently of the DecoderTrack. - // proxy() would share the same reference, allowing the source to close our frame. - effect.run((inner) => { - const frame = inner.get(active.frame); - this.#frame.update((prev) => { - prev?.close(); - return frame?.clone(); - }); - }); effect.proxy(this.#timestamp, active.timestamp); effect.proxy(this.#buffered, active.buffered); } @@ -147,21 +147,21 @@ export class Decoder implements Backend { return; } - const frame = effect.get(this.frame); - if (!frame) return; + const active = effect.get(this.#active); + if (!active) return; - effect.set(this.#display, { - width: frame.displayWidth, - height: frame.displayHeight, - }); + const dims = effect.get(active.display); + if (!dims) return; + + effect.set(this.#display, dims); } #runBuffering(effect: Effect): void { const enabled = effect.get(this.enabled); if (!enabled) return; - const frame = effect.get(this.frame); - if (!frame) { + const timestamp = effect.get(this.#timestamp); + if (timestamp === undefined) { this.#stalled.set(true); return; } @@ -174,11 +174,6 @@ export class Decoder implements Backend { } close() { - this.#frame.update((prev) => { - prev?.close(); - return undefined; - }); - this.#signals.close(); } } @@ -200,7 +195,10 @@ class DecoderTrack { stats: Signal; timestamp = new Signal(undefined); - frame = new Signal(undefined); + + // Display dimensions taken from the first decoded frame, used as a fallback + // when the catalog doesn't carry display metadata. + display = new Signal<{ width: number; height: number } | undefined>(undefined); // Network jitter + decode buffer. buffered = new Signal([]); @@ -208,6 +206,10 @@ class DecoderTrack { // Decoded frames waiting to be rendered. #buffered = new Signal([]); + // Decoded frames awaiting paint, in PTS-ascending order. VideoDecoder + // emits in display order, so push order is already monotonic. + #queue: VideoFrame[] = []; + signals = new Effect(); constructor(props: DecoderTrackProps) { @@ -228,41 +230,24 @@ class DecoderTrack { effect.cleanup(() => sub.close()); const decoder = new VideoDecoder({ - output: async (frame: VideoFrame) => { - try { - const timestamp = Time.Milli.fromMicro(frame.timestamp as Time.Micro); - if (timestamp < (this.timestamp.peek() ?? 0)) { - // Late frame, don't render it. - return; - } - - if (this.frame.peek() === undefined) { - // Render something while we wait for the sync to catch up. - this.frame.set(frame.clone()); - } - - const wait = this.source.sync.wait(timestamp).then(() => true); - const ok = await Promise.race([wait, effect.cancel]); - if (!ok) return; - - if (timestamp < (this.timestamp.peek() ?? 0)) { - // Late frame, don't render it. - // NOTE: This can happen when the ref is updated, such as on playback start. - return; - } - - this.timestamp.set(timestamp); - - // Trim the decode buffer as frames are rendered - this.#trimBuffered(timestamp); + output: (frame: VideoFrame) => { + const timestamp = Time.Milli.fromMicro(frame.timestamp as Time.Micro); - this.frame.update((prev) => { - prev?.close(); - return frame.clone(); // avoid closing the frame here - }); - } finally { + // Drop frames that have already been displayed (can happen if the + // reference resets, e.g. on playback start). + if (timestamp < (this.timestamp.peek() ?? 0)) { frame.close(); + return; } + + // Capture display dimensions from the first frame so #runDisplay + // can fall back to them when the catalog has no display metadata. + if (this.display.peek() === undefined) { + this.display.set({ width: frame.displayWidth, height: frame.displayHeight }); + } + + // Queue for the renderer to pick up on its next vsync. + this.#queue.push(frame); }, // TODO bubble up error error: (error) => { @@ -272,6 +257,9 @@ class DecoderTrack { }); effect.cleanup(() => { if (decoder.state !== "closed") decoder.close(); + // Drain any frames the renderer never got to. + for (const frame of this.#queue) frame.close(); + this.#queue.length = 0; }); // Input processing - depends on container type @@ -479,13 +467,36 @@ class DecoderTrack { }); } + // Pop the newest queued frame whose PTS is ≤ now, closing any older ones. + // Caller takes ownership of the returned frame and must close it. + consume(now: Time.Milli): VideoFrame | undefined { + let pickIdx = -1; + for (let i = this.#queue.length - 1; i >= 0; i--) { + const ts = Time.Milli.fromMicro(this.#queue[i].timestamp as Time.Micro); + if (ts <= now) { + pickIdx = i; + break; + } + } + if (pickIdx < 0) return undefined; + + // Close older frames — they would render in the past. + for (let i = 0; i < pickIdx; i++) { + this.#queue[i].close(); + } + + const frame = this.#queue[pickIdx]; + this.#queue.splice(0, pickIdx + 1); + + const timestamp = Time.Milli.fromMicro(frame.timestamp as Time.Micro); + this.timestamp.set(timestamp); + this.#trimBuffered(timestamp); + + return frame; + } + close(): void { this.signals.close(); - - this.frame.update((prev) => { - prev?.close(); - return undefined; - }); } } diff --git a/js/watch/src/video/renderer.ts b/js/watch/src/video/renderer.ts index f40adcdc3..69e42ec82 100644 --- a/js/watch/src/video/renderer.ts +++ b/js/watch/src/video/renderer.ts @@ -7,7 +7,7 @@ export type RendererProps = { paused?: boolean | Signal; }; -// An component to render a video to a canvas. +// A component to render a video to a canvas. export class Renderer { decoder: Decoder; @@ -108,40 +108,30 @@ export class Renderer { const ctx = effect.get(this.#ctx); if (!ctx) return; - const paused = effect.get(this.paused); - - // Read new frames from the decoder when not paused. - let decoded: VideoFrame | undefined; - if (!paused) { - decoded = effect.get(this.decoder.frame); - } + let rafId: number | undefined; - // Request a callback to render the frame based on the monitor's refresh rate. - // Always render, even when paused (to show last frame) - let animate: number | undefined = requestAnimationFrame(() => { - const frame = decoded ?? this.frame.peek(); - this.#render(ctx, frame); - - // Update signals to reflect what's actually on screen. - if (decoded) { + const tick = () => { + const frame = this.decoder.consume(); + if (frame) { + this.#draw(ctx, frame); this.frame.update((old) => { old?.close(); - return decoded.clone(); + return frame; // transfer ownership from consume() }); - this.timestamp.set(Time.Milli.fromMicro(decoded.timestamp as Time.Micro)); + this.timestamp.set(Time.Milli.fromMicro(frame.timestamp as Time.Micro)); } - animate = undefined; - }); + rafId = requestAnimationFrame(tick); + }; + + rafId = requestAnimationFrame(tick); - // Clean up any pending animation request. effect.cleanup(() => { - decoded?.close(); - if (animate) cancelAnimationFrame(animate); + if (rafId !== undefined) cancelAnimationFrame(rafId); }); } - #render(ctx: CanvasRenderingContext2D, frame?: VideoFrame) { + #draw(ctx: CanvasRenderingContext2D, frame?: VideoFrame) { if (!frame) { // Clear canvas when no frame ctx.fillStyle = "#000"; From 00eeac92ab40ae312fed594241963c262af5812b Mon Sep 17 00:00:00 2001 From: Simon Kirsten Date: Sun, 3 May 2026 19:33:26 +0200 Subject: [PATCH 3/3] fix(watch): close MultiBackend's sync and sources MultiBackend.close() only closed its own #runElement signal, leaving the Sync and the two Source instances (constructed as fields, not inside an Effect) alive. Each owns its own Effect with several .run() handlers that never get cleaned up, surfacing as "Signals was garbage collected without being closed" warnings whenever a watch component unmounts. Close them explicitly, ordered so inner Decoder/Renderer/Emitter (spawned via the #runElement effect) finish before the sources/sync they reference are torn down. --- js/watch/src/backend.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/js/watch/src/backend.ts b/js/watch/src/backend.ts index 03cd057f1..148fab6ec 100644 --- a/js/watch/src/backend.ts +++ b/js/watch/src/backend.ts @@ -217,5 +217,8 @@ export class MultiBackend implements Backend { close(): void { this.signals.close(); + this.#videoSource.close(); + this.#audioSource.close(); + this.sync.close(); } }