diff --git a/js/hang/src/container/legacy.ts b/js/hang/src/container/legacy.ts index 68498f88e..8532153c4 100644 --- a/js/hang/src/container/legacy.ts +++ b/js/hang/src/container/legacy.ts @@ -280,15 +280,24 @@ export class Consumer { throw new Error("multiple calls to decode not supported"); } - const wait = new Promise((resolve) => { - this.#notify = resolve; - }).then(() => true); - - if (!(await Promise.race([wait, this.#signals.closed]))) { - this.#notify = undefined; - // Consumer was closed while waiting for a new frame. - return undefined; - } + // Wait for either a new frame (notify) or the consumer to close (abort). + // We listen on the AbortSignal directly and remove the listener when + // notify wins; racing against a never-settling Promise leaks a + // PromiseReaction per call (each one retaining the awaiter's result). + const abort = this.#signals.abort; + if (abort.aborted) return undefined; + + const aborted = await new Promise((resolve) => { + const onAbort = () => resolve(true); + abort.addEventListener("abort", onAbort, { once: true }); + this.#notify = () => { + abort.removeEventListener("abort", onAbort); + resolve(false); + }; + }); + + this.#notify = undefined; + if (aborted) return undefined; } } diff --git a/js/watch/src/audio/decoder.ts b/js/watch/src/audio/decoder.ts index 2facbfafa..b87f3e600 100644 --- a/js/watch/src/audio/decoder.ts +++ b/js/watch/src/audio/decoder.ts @@ -269,6 +269,14 @@ export class Decoder { this.#buffered.update(() => decode); }); + // Track open groups so effect cleanup can close them; otherwise + // in-flight readFrame() awaits would never resolve when sub closes. + const openGroups = new Set(); + effect.cleanup(() => { + for (const group of openGroups) group.close(); + openGroups.clear(); + }); + effect.spawn(async () => { const loaded = await Util.Libav.polyfill(); if (!loaded) return; // cancelled @@ -295,6 +303,8 @@ export class Decoder { const group = await sub.recvGroup(); if (!group) break; + openGroups.add(group); + effect.spawn(async () => { try { for (;;) { @@ -322,6 +332,7 @@ export class Decoder { } } } finally { + openGroups.delete(group); group.close(); } }); diff --git a/js/watch/src/backend.ts b/js/watch/src/backend.ts index 03cd057f1..148fab6ec 100644 --- a/js/watch/src/backend.ts +++ b/js/watch/src/backend.ts @@ -217,5 +217,8 @@ export class MultiBackend implements Backend { close(): void { this.signals.close(); + this.#videoSource.close(); + this.#audioSource.close(); + this.sync.close(); } } diff --git a/js/watch/src/sync.ts b/js/watch/src/sync.ts index 4980c117f..cb0e99d1a 100644 --- a/js/watch/src/sync.ts +++ b/js/watch/src/sync.ts @@ -36,10 +36,6 @@ export class Sync { #buffer = new Signal(Time.Milli.zero); readonly buffer: Signal = this.#buffer; - // A ghetto way to learn when the reference/buffer changes. - // There's probably a way to use Effect, but lets keep it simple for now. - #update: PromiseWithResolvers; - // The media timestamp of the most recently received frame. readonly timestamp = new Signal(undefined); @@ -62,8 +58,6 @@ export class Sync { this.audio = Signal.from(props?.audio); this.video = Signal.from(props?.video); - this.#update = Promise.withResolvers(); - this.signals.run(this.#runJitter.bind(this)); this.signals.run(this.#runBuffer.bind(this)); } @@ -105,9 +99,6 @@ export class Sync { const buffer = Time.Milli.add(Time.Milli.max(video, audio), jitter); this.#buffer.set(buffer); - - this.#update.resolve(); - this.#update = Promise.withResolvers(); } // Update the reference if this is the earliest frame we've seen, relative to its timestamp. @@ -118,9 +109,9 @@ export class Sync { const currentRef = this.#reference.peek(); if (currentRef !== undefined) { - // Check if `wait()` would not sleep at all. - // NOTE: We check here instead of in `wait()` so we can identify when frames are received late. - // Otherwise, chained `wait()` calls would cause a false-positive during CPU starvation. + // Detect frames received later than their scheduled play time, so we can + // distinguish "the network is dropping us behind" from CPU starvation in + // downstream pacing. const sleep = Time.Milli.add(Time.Milli.sub(currentRef, ref), this.#buffer.peek()); if (sleep < 0) { const entry = this.#late.get(label); @@ -147,8 +138,6 @@ export class Sync { } this.#reference.set(ref); - this.#update.resolve(); - this.#update = Promise.withResolvers(); } // The PTS that should be rendering right now, derived from the reference + buffer. @@ -159,35 +148,6 @@ export class Sync { return Time.Milli.sub(Time.Milli.sub(Time.Milli.now(), reference), this.#buffer.peek()); } - // Sleep until it's time to render this frame. - async wait(timestamp: Time.Milli): Promise { - const reference = this.#reference.peek(); - if (reference === undefined) { - throw new Error("reference not set; call update() first"); - } - - for (;;) { - // Sleep until it's time to decode the next frame. - // NOTE: This function runs in parallel for each frame. - const now = Time.Milli.now(); - const ref = Time.Milli.sub(now, timestamp); - - const currentRef = this.#reference.peek(); - if (currentRef === undefined) return; - - const sleep = Time.Milli.add(Time.Milli.sub(currentRef, ref), this.#buffer.peek()); - if (sleep <= 0) return; - - // Skip setTimeout for small sleeps; the timer resolution (~4ms) would overshoot. - if (sleep < 5) return; - - const wait = new Promise((resolve) => setTimeout(resolve, sleep)).then(() => true); - - const ok = await Promise.race([this.#update.promise, wait]); - if (ok) return; - } - } - static #formatDuration(ms: number): string { ms = Math.round(ms); if (ms < 1000) return `${ms}ms`; diff --git a/js/watch/src/video/decoder.ts b/js/watch/src/video/decoder.ts index fed21f65c..33fb3306f 100644 --- a/js/watch/src/video/decoder.ts +++ b/js/watch/src/video/decoder.ts @@ -28,11 +28,7 @@ export class Decoder implements Backend { // The current track running, held so we can cancel it when the new track is ready. #active = new Signal(undefined); - // Expose the current frame to render as a signal - #frame = new Signal(undefined); - readonly frame: Getter = this.#frame; - - // The timestamp of the current frame. + // The timestamp of the most recently consumed frame. #timestamp = new Signal(undefined); readonly timestamp: Getter = this.#timestamp; @@ -52,6 +48,19 @@ export class Decoder implements Backend { #signals = new Effect(); + // Pop the newest decoded frame whose PTS is at or before sync.now(), closing + // any older queued frames. The caller takes ownership of the returned frame + // and is responsible for closing it. Returns undefined if no frame is ready. + consume(): VideoFrame | undefined { + const active = this.#active.peek(); + if (!active) return undefined; + + const now = this.source.sync.now(); + if (now === undefined) return undefined; + + return active.consume(now); + } + constructor(source: Source, props?: DecoderProps) { this.enabled = Signal.from(props?.enabled ?? false); @@ -121,15 +130,6 @@ export class Decoder implements Backend { effect.cleanup(() => active.close()); - // Clone the frame so we own it independently of the DecoderTrack. - // proxy() would share the same reference, allowing the source to close our frame. - effect.run((inner) => { - const frame = inner.get(active.frame); - this.#frame.update((prev) => { - prev?.close(); - return frame?.clone(); - }); - }); effect.proxy(this.#timestamp, active.timestamp); effect.proxy(this.#buffered, active.buffered); } @@ -147,21 +147,21 @@ export class Decoder implements Backend { return; } - const frame = effect.get(this.frame); - if (!frame) return; + const active = effect.get(this.#active); + if (!active) return; - effect.set(this.#display, { - width: frame.displayWidth, - height: frame.displayHeight, - }); + const dims = effect.get(active.display); + if (!dims) return; + + effect.set(this.#display, dims); } #runBuffering(effect: Effect): void { const enabled = effect.get(this.enabled); if (!enabled) return; - const frame = effect.get(this.frame); - if (!frame) { + const timestamp = effect.get(this.#timestamp); + if (timestamp === undefined) { this.#stalled.set(true); return; } @@ -174,11 +174,6 @@ export class Decoder implements Backend { } close() { - this.#frame.update((prev) => { - prev?.close(); - return undefined; - }); - this.#signals.close(); } } @@ -200,7 +195,10 @@ class DecoderTrack { stats: Signal; timestamp = new Signal(undefined); - frame = new Signal(undefined); + + // Display dimensions taken from the first decoded frame, used as a fallback + // when the catalog doesn't carry display metadata. + display = new Signal<{ width: number; height: number } | undefined>(undefined); // Network jitter + decode buffer. buffered = new Signal([]); @@ -208,6 +206,10 @@ class DecoderTrack { // Decoded frames waiting to be rendered. #buffered = new Signal([]); + // Decoded frames awaiting paint, in PTS-ascending order. VideoDecoder + // emits in display order, so push order is already monotonic. + #queue: VideoFrame[] = []; + signals = new Effect(); constructor(props: DecoderTrackProps) { @@ -228,41 +230,24 @@ class DecoderTrack { effect.cleanup(() => sub.close()); const decoder = new VideoDecoder({ - output: async (frame: VideoFrame) => { - try { - const timestamp = Time.Milli.fromMicro(frame.timestamp as Time.Micro); - if (timestamp < (this.timestamp.peek() ?? 0)) { - // Late frame, don't render it. - return; - } - - if (this.frame.peek() === undefined) { - // Render something while we wait for the sync to catch up. - this.frame.set(frame.clone()); - } - - const wait = this.source.sync.wait(timestamp).then(() => true); - const ok = await Promise.race([wait, effect.cancel]); - if (!ok) return; - - if (timestamp < (this.timestamp.peek() ?? 0)) { - // Late frame, don't render it. - // NOTE: This can happen when the ref is updated, such as on playback start. - return; - } - - this.timestamp.set(timestamp); - - // Trim the decode buffer as frames are rendered - this.#trimBuffered(timestamp); + output: (frame: VideoFrame) => { + const timestamp = Time.Milli.fromMicro(frame.timestamp as Time.Micro); - this.frame.update((prev) => { - prev?.close(); - return frame.clone(); // avoid closing the frame here - }); - } finally { + // Drop frames that have already been displayed (can happen if the + // reference resets, e.g. on playback start). + if (timestamp < (this.timestamp.peek() ?? 0)) { frame.close(); + return; } + + // Capture display dimensions from the first frame so #runDisplay + // can fall back to them when the catalog has no display metadata. + if (this.display.peek() === undefined) { + this.display.set({ width: frame.displayWidth, height: frame.displayHeight }); + } + + // Queue for the renderer to pick up on its next vsync. + this.#queue.push(frame); }, // TODO bubble up error error: (error) => { @@ -272,6 +257,9 @@ class DecoderTrack { }); effect.cleanup(() => { if (decoder.state !== "closed") decoder.close(); + // Drain any frames the renderer never got to. + for (const frame of this.#queue) frame.close(); + this.#queue.length = 0; }); // Input processing - depends on container type @@ -308,7 +296,12 @@ class DecoderTrack { effect.spawn(async () => { for (;;) { - const next = await Promise.race([consumer.next(), effect.cancel]); + // consumer.close() runs from effect.cleanup; that aborts the + // consumer, which makes next() return undefined and we break. + // Don't race against effect.cancel — racing a hot loop against + // a long-lived non-settling promise leaks a PromiseReaction + // per call. + const next = await consumer.next(); if (!next) break; const { frame, group } = next; @@ -376,19 +369,31 @@ class DecoderTrack { this.buffered.update(() => decode); }); + // Track open groups so we can close them on cleanup. Closing the parent + // sub doesn't necessarily cascade to in-flight GroupConsumers, and we + // want recvGroup/readFrame to terminate naturally instead of racing + // against effect.cancel (which leaks PromiseReactions per call). + const openGroups = new Set(); + effect.cleanup(() => { + for (const group of openGroups) group.close(); + openGroups.clear(); + }); + effect.spawn(async () => { // Process data segments // TODO: Use a consumer wrapper for CMAF to support latency control for (;;) { - const group = await Promise.race([sub.recvGroup(), effect.cancel]); + const group = await sub.recvGroup(); if (!group) break; + openGroups.add(group); + effect.spawn(async () => { let previous: Time.Micro | undefined; try { for (;;) { - const segment = await Promise.race([group.readFrame(), effect.cancel]); + const segment = await group.readFrame(); if (!segment) break; const samples = Container.Cmaf.decodeDataSegment(segment, timescale); @@ -422,6 +427,7 @@ class DecoderTrack { } } } finally { + openGroups.delete(group); group.close(); } }); @@ -461,13 +467,36 @@ class DecoderTrack { }); } + // Pop the newest queued frame whose PTS is ≤ now, closing any older ones. + // Caller takes ownership of the returned frame and must close it. + consume(now: Time.Milli): VideoFrame | undefined { + let pickIdx = -1; + for (let i = this.#queue.length - 1; i >= 0; i--) { + const ts = Time.Milli.fromMicro(this.#queue[i].timestamp as Time.Micro); + if (ts <= now) { + pickIdx = i; + break; + } + } + if (pickIdx < 0) return undefined; + + // Close older frames — they would render in the past. + for (let i = 0; i < pickIdx; i++) { + this.#queue[i].close(); + } + + const frame = this.#queue[pickIdx]; + this.#queue.splice(0, pickIdx + 1); + + const timestamp = Time.Milli.fromMicro(frame.timestamp as Time.Micro); + this.timestamp.set(timestamp); + this.#trimBuffered(timestamp); + + return frame; + } + close(): void { this.signals.close(); - - this.frame.update((prev) => { - prev?.close(); - return undefined; - }); } } diff --git a/js/watch/src/video/renderer.ts b/js/watch/src/video/renderer.ts index f40adcdc3..69e42ec82 100644 --- a/js/watch/src/video/renderer.ts +++ b/js/watch/src/video/renderer.ts @@ -7,7 +7,7 @@ export type RendererProps = { paused?: boolean | Signal; }; -// An component to render a video to a canvas. +// A component to render a video to a canvas. export class Renderer { decoder: Decoder; @@ -108,40 +108,30 @@ export class Renderer { const ctx = effect.get(this.#ctx); if (!ctx) return; - const paused = effect.get(this.paused); - - // Read new frames from the decoder when not paused. - let decoded: VideoFrame | undefined; - if (!paused) { - decoded = effect.get(this.decoder.frame); - } + let rafId: number | undefined; - // Request a callback to render the frame based on the monitor's refresh rate. - // Always render, even when paused (to show last frame) - let animate: number | undefined = requestAnimationFrame(() => { - const frame = decoded ?? this.frame.peek(); - this.#render(ctx, frame); - - // Update signals to reflect what's actually on screen. - if (decoded) { + const tick = () => { + const frame = this.decoder.consume(); + if (frame) { + this.#draw(ctx, frame); this.frame.update((old) => { old?.close(); - return decoded.clone(); + return frame; // transfer ownership from consume() }); - this.timestamp.set(Time.Milli.fromMicro(decoded.timestamp as Time.Micro)); + this.timestamp.set(Time.Milli.fromMicro(frame.timestamp as Time.Micro)); } - animate = undefined; - }); + rafId = requestAnimationFrame(tick); + }; + + rafId = requestAnimationFrame(tick); - // Clean up any pending animation request. effect.cleanup(() => { - decoded?.close(); - if (animate) cancelAnimationFrame(animate); + if (rafId !== undefined) cancelAnimationFrame(rafId); }); } - #render(ctx: CanvasRenderingContext2D, frame?: VideoFrame) { + #draw(ctx: CanvasRenderingContext2D, frame?: VideoFrame) { if (!frame) { // Clear canvas when no frame ctx.fillStyle = "#000";