Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions js/hang/src/container/legacy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,24 @@ export class Consumer {
throw new Error("multiple calls to decode not supported");
}

const wait = new Promise<void>((resolve) => {
this.#notify = resolve;
}).then(() => true);

if (!(await Promise.race([wait, this.#signals.closed]))) {
this.#notify = undefined;
// Consumer was closed while waiting for a new frame.
return undefined;
}
// Wait for either a new frame (notify) or the consumer to close (abort).
// We listen on the AbortSignal directly and remove the listener when
// notify wins; racing against a never-settling Promise leaks a
// PromiseReaction per call (each one retaining the awaiter's result).
const abort = this.#signals.abort;
if (abort.aborted) return undefined;

const aborted = await new Promise<boolean>((resolve) => {
const onAbort = () => resolve(true);
abort.addEventListener("abort", onAbort, { once: true });
this.#notify = () => {
abort.removeEventListener("abort", onAbort);
resolve(false);
};
});

this.#notify = undefined;
if (aborted) return undefined;
}
}

Expand Down
11 changes: 11 additions & 0 deletions js/watch/src/audio/decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,14 @@ export class Decoder {
this.#buffered.update(() => decode);
});

// Track open groups so effect cleanup can close them; otherwise
// in-flight readFrame() awaits would never resolve when sub closes.
const openGroups = new Set<Moq.Group>();
effect.cleanup(() => {
for (const group of openGroups) group.close();
openGroups.clear();
});

effect.spawn(async () => {
const loaded = await Util.Libav.polyfill();
if (!loaded) return; // cancelled
Expand All @@ -295,6 +303,8 @@ export class Decoder {
const group = await sub.recvGroup();
if (!group) break;

openGroups.add(group);

effect.spawn(async () => {
try {
for (;;) {
Expand Down Expand Up @@ -322,6 +332,7 @@ export class Decoder {
}
}
} finally {
openGroups.delete(group);
group.close();
}
});
Expand Down
3 changes: 3 additions & 0 deletions js/watch/src/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,5 +217,8 @@ export class MultiBackend implements Backend {

close(): void {
this.signals.close();
this.#videoSource.close();
this.#audioSource.close();
this.sync.close();
}
}
46 changes: 3 additions & 43 deletions js/watch/src/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ export class Sync {
#buffer = new Signal<Time.Milli>(Time.Milli.zero);
readonly buffer: Signal<Time.Milli> = this.#buffer;

// A ghetto way to learn when the reference/buffer changes.
// There's probably a way to use Effect, but lets keep it simple for now.
#update: PromiseWithResolvers<void>;

// The media timestamp of the most recently received frame.
readonly timestamp = new Signal<Time.Milli | undefined>(undefined);

Expand All @@ -62,8 +58,6 @@ export class Sync {
this.audio = Signal.from(props?.audio);
this.video = Signal.from(props?.video);

this.#update = Promise.withResolvers();

this.signals.run(this.#runJitter.bind(this));
this.signals.run(this.#runBuffer.bind(this));
}
Expand Down Expand Up @@ -105,9 +99,6 @@ export class Sync {

const buffer = Time.Milli.add(Time.Milli.max(video, audio), jitter);
this.#buffer.set(buffer);

this.#update.resolve();
this.#update = Promise.withResolvers();
}

// Update the reference if this is the earliest frame we've seen, relative to its timestamp.
Expand All @@ -118,9 +109,9 @@ export class Sync {
const currentRef = this.#reference.peek();

if (currentRef !== undefined) {
// Check if `wait()` would not sleep at all.
// NOTE: We check here instead of in `wait()` so we can identify when frames are received late.
// Otherwise, chained `wait()` calls would cause a false-positive during CPU starvation.
// Detect frames received later than their scheduled play time, so we can
// distinguish "the network is dropping us behind" from CPU starvation in
// downstream pacing.
const sleep = Time.Milli.add(Time.Milli.sub(currentRef, ref), this.#buffer.peek());
if (sleep < 0) {
const entry = this.#late.get(label);
Expand All @@ -147,8 +138,6 @@ export class Sync {
}

this.#reference.set(ref);
this.#update.resolve();
this.#update = Promise.withResolvers();
}

// The PTS that should be rendering right now, derived from the reference + buffer.
Expand All @@ -159,35 +148,6 @@ export class Sync {
return Time.Milli.sub(Time.Milli.sub(Time.Milli.now(), reference), this.#buffer.peek());
}

// Sleep until it's time to render this frame.
async wait(timestamp: Time.Milli): Promise<void> {
const reference = this.#reference.peek();
if (reference === undefined) {
throw new Error("reference not set; call update() first");
}

for (;;) {
// Sleep until it's time to decode the next frame.
// NOTE: This function runs in parallel for each frame.
const now = Time.Milli.now();
const ref = Time.Milli.sub(now, timestamp);

const currentRef = this.#reference.peek();
if (currentRef === undefined) return;

const sleep = Time.Milli.add(Time.Milli.sub(currentRef, ref), this.#buffer.peek());
if (sleep <= 0) return;

// Skip setTimeout for small sleeps; the timer resolution (~4ms) would overshoot.
if (sleep < 5) return;

const wait = new Promise((resolve) => setTimeout(resolve, sleep)).then(() => true);

const ok = await Promise.race([this.#update.promise, wait]);
if (ok) return;
}
}

static #formatDuration(ms: number): string {
ms = Math.round(ms);
if (ms < 1000) return `${ms}ms`;
Expand Down
Loading
Loading