Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions js/hang/src/catalog/audio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ const TrackSchema = z.object({
// Mirrors AudioDecoderConfig
// https://w3c.github.io/webcodecs/#audio-decoder-config
export const AudioConfigSchema = z.object({
// Optional reference to another broadcast that publishes this track, expressed
// relative to the broadcast that served this catalog (e.g. "../source").
// If unset, the track lives in the same broadcast as the catalog.
broadcast: z.optional(z.string()),

// See: https://w3c.github.io/webcodecs/codec_registry.html
codec: z.string(),

Expand Down
1 change: 1 addition & 0 deletions js/hang/src/catalog/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export * from "./chat";
export * from "./container";
export * from "./integers";
export * from "./location";
export * from "./path";
export * from "./preview";
export * from "./priority";
export * from "./root";
Expand Down
33 changes: 33 additions & 0 deletions js/hang/src/catalog/path.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Path } from "@moq/lite";

/**
* Resolve a relative broadcast reference against the path of the broadcast that served the catalog.
*
* `..` segments pop the last segment of the base path; named segments are appended.
* Excess `..` clamps at the root, returning an empty path. An empty `rel` returns the base
* path unchanged.
*
* Mirrors the Rust `Path::resolve(&PathRelative)` helper used by hang catalogs to express
* cross-broadcast track references.
*
* @example
* ```typescript
* resolveBroadcast(Path.from("a/b/c"), "../source"); // "a/b/source"
* resolveBroadcast(Path.from("a/b"), "x/y"); // "a/b/x/y"
* resolveBroadcast(Path.from("a"), "../../x"); // "x"
* ```
*/
export function resolveBroadcast(base: Path.Valid, rel: string): Path.Valid {
const baseSegments = base === "" ? [] : base.split("/");
const relSegments = rel.split("/").filter((s) => s !== "");

for (const seg of relSegments) {
if (seg === "..") {
baseSegments.pop();
} else {
baseSegments.push(seg);
}
}

return Path.from(...baseSegments);
}
5 changes: 5 additions & 0 deletions js/hang/src/catalog/video.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ const TrackSchema = z.object({

// Based on VideoDecoderConfig
export const VideoConfigSchema = z.object({
// Optional reference to another broadcast that publishes this track, expressed
// relative to the broadcast that served this catalog (e.g. "../source").
// If unset, the track lives in the same broadcast as the catalog.
broadcast: z.optional(z.string()),

// See: https://w3c.github.io/webcodecs/codec_registry.html
codec: z.string(),

Expand Down
4 changes: 3 additions & 1 deletion js/watch/src/audio/decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ export class Decoder {
const config = effect.get(this.source.config);
if (!config) return;

const active = effect.get(broadcast.active);
// Honor a per-rendition `broadcast` override: resolve to the source broadcast if set,
// otherwise use the catalog's own broadcast.
const active = broadcast.trackBroadcast(effect, config.broadcast);
if (!active) return;

const sub = active.subscribe(track, Catalog.PRIORITY.audio);
Expand Down
8 changes: 5 additions & 3 deletions js/watch/src/audio/mse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,17 @@ export class Mse implements Backend {
const broadcast = effect.get(this.source.broadcast);
if (!broadcast) return;

const active = effect.get(broadcast.active);
if (!active) return;

const track = effect.get(this.source.track);
if (!track) return;

const config = effect.get(this.source.config);
if (!config) return;

// Honor a per-rendition `broadcast` override: resolve to the source broadcast if set,
// otherwise use the catalog's own broadcast.
const active = broadcast.trackBroadcast(effect, config.broadcast);
if (!active) return;

const mime = `audio/mp4; codecs="${config.codec}"`;

const sourceBuffer = mediaSource.addSourceBuffer(mime);
Expand Down
24 changes: 24 additions & 0 deletions js/watch/src/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,30 @@ export class Broadcast {
});
}

/**
* Resolve the `Moq.Broadcast` that publishes a given track.
*
* If `configBroadcast` is set, treat it as a path relative to this broadcast's name and
* subscribe to the resolved broadcast on the same connection. Otherwise return the catalog's
* own active broadcast.
*
* The lifetime of any newly-opened broadcast is tied to the provided `Effect`.
*/
trackBroadcast(effect: Effect, configBroadcast: string | undefined): Moq.Broadcast | undefined {
const active = effect.get(this.active);
if (!active) return undefined;
if (!configBroadcast) return active;

const conn = effect.get(this.connection);
if (!conn) return undefined;

const basePath = effect.get(this.name);
const resolved = Catalog.resolveBroadcast(basePath, configBroadcast);
const remote = conn.consume(resolved);
effect.cleanup(() => remote.close());
return remote;
}

close() {
this.signals.close();
}
Expand Down
4 changes: 3 additions & 1 deletion js/watch/src/video/decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ export class Decoder implements Backend {
}
const [_, source, track, config] = values;

const broadcast = effect.get(source.active);
// Honor a per-rendition `broadcast` override: subscribe on the resolved source
// broadcast instead of the catalog's broadcast. Falls back to the catalog's broadcast.
const broadcast = source.trackBroadcast(effect, config.broadcast);
if (!broadcast) return;

// Start a new pending effect.
Expand Down
8 changes: 5 additions & 3 deletions js/watch/src/video/mse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ export class Mse implements Backend {
const broadcast = effect.get(this.source.broadcast);
if (!broadcast) return;

const active = effect.get(broadcast.active);
if (!active) return;

const track = effect.get(this.source.track);
if (!track) return;

const config = effect.get(this.source.config);
if (!config) return;

// Honor a per-rendition `broadcast` override: resolve to the source broadcast if set,
// otherwise use the catalog's own broadcast.
const active = broadcast.trackBroadcast(effect, config.broadcast);
if (!active) return;

const mime = `video/mp4; codecs="${config.codec}"`;

const sourceBuffer = mediaSource.addSourceBuffer(mime);
Expand Down
17 changes: 16 additions & 1 deletion rs/hang/examples/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,28 @@ async fn run_subscribe(mut consumer: moq_lite::OriginConsumer) -> anyhow::Result
codec = %config.codec,
width = ?config.coded_width,
height = ?config.coded_height,
broadcast_override = ?config.broadcast.as_ref().map(|p| p.as_str()),
"subscribing to video track"
);

// If the rendition references a different broadcast (e.g. a source feed that this
// catalog only sidecars), resolve it relative to the catalog's broadcast path and
// wait for the announcement. Otherwise subscribe on the catalog's broadcast.
let track_broadcast = match config.broadcast.as_ref() {
Some(rel) => {
let resolved = path.resolve(rel);
consumer
.announced_broadcast(&resolved)
.await
.ok_or_else(|| anyhow::anyhow!("source broadcast unavailable: {resolved}"))?
}
None => broadcast.clone(),
};

// Subscribe to the video track.
let track = moq_lite::Track::new(name.clone());

let track_consumer = broadcast.subscribe_track(
let track_consumer = track_broadcast.subscribe_track(
&track,
moq_lite::Subscription {
priority: 1,
Expand Down
1 change: 1 addition & 0 deletions rs/hang/examples/video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> anyhow::Result<m
// Example video configuration
// In a real application, you would get this from the encoder
let video_config = hang::catalog::VideoConfig {
broadcast: None,
codec: hang::catalog::H264 {
profile: 0x4D, // Main profile
constraints: 0,
Expand Down
6 changes: 6 additions & 0 deletions rs/hang/src/catalog/audio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ impl Audio {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct AudioConfig {
/// Optional reference to another broadcast that publishes this track, expressed
/// relative to the broadcast that served this catalog. If unset, the track lives
/// in the same broadcast as the catalog.
#[serde(default)]
pub broadcast: Option<moq_lite::PathRelativeOwned>,

// The codec, see the registry for details:
// https://w3c.github.io/webcodecs/codec_registry.html
#[serde_as(as = "DisplayFromStr")]
Expand Down
36 changes: 36 additions & 0 deletions rs/hang/src/catalog/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ mod test {
video_renditions.insert(
"video".to_string(),
VideoConfig {
broadcast: None,
codec: H264 {
profile: 0x64,
constraints: 0x00,
Expand All @@ -160,6 +161,7 @@ mod test {
audio_renditions.insert(
"audio".to_string(),
AudioConfig {
broadcast: None,
codec: Opus,
sample_rate: 48_000,
channel_count: 2,
Expand Down Expand Up @@ -189,4 +191,38 @@ mod test {
let output = decoded.to_string().expect("failed to encode");
assert_eq!(encoded, output, "wrong encoded output");
}

#[test]
fn rendition_with_broadcast_override() {
// Decode a catalog where one rendition references a track in a sibling broadcast,
// and verify the `broadcast` field round-trips through serde.
let encoded = r#"{
"video": {
"renditions": {
"video": {
"broadcast": "../source",
"codec": "avc1.64001f",
"codedWidth": 1280,
"codedHeight": 720,
"container": {"kind": "legacy"}
}
}
}
}"#;

let parsed = Catalog::from_str(encoded).expect("failed to decode");
let rendition = parsed.video.renditions.get("video").expect("missing rendition");
assert_eq!(
rendition.broadcast.as_ref().map(|p| p.as_str()),
Some("../source"),
"broadcast field did not deserialize"
);

// Re-encoding preserves the broadcast field.
let output = parsed.to_string().expect("failed to encode");
assert!(
output.contains(r#""broadcast":"../source""#),
"broadcast field missing from re-encoded JSON: {output}"
);
}
}
9 changes: 9 additions & 0 deletions rs/hang/src/catalog/video/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ pub struct Display {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct VideoConfig {
/// Optional reference to another broadcast that publishes this track, expressed
/// relative to the broadcast that served this catalog. If unset, the track lives
/// in the same broadcast as the catalog.
///
/// This allows a worker to author a downstream catalog that points unchanged
/// renditions at the source broadcast without re-publishing the bytes.
#[serde(default)]
pub broadcast: Option<moq_lite::PathRelativeOwned>,

/// The codec, see the registry for details:
/// <https://w3c.github.io/webcodecs/codec_registry.html>
#[serde_as(as = "DisplayFromStr")]
Expand Down
Loading
Loading