moq-lite: rewrite Origin as a poll-driven, conducer-based model#1358
moq-lite: rewrite Origin as a poll-driven, conducer-based model#1358
Conversation
Replace the OriginNode/NotifyNode tree, per-publish web_async::spawn cleanup, and per-consumer mpsc fan-out with a flat HashMap behind a parking-lot-style Mutex plus per-consumer queues. Wakers register on both the global state and each tracked broadcast's `poll_closed`, so broadcast closures wake consumers directly — no spawned cleanup tasks and no more `tokio::time::sleep(1ms)` in tests. Renames (with one-line migrations across the workspace): publish_broadcast -> publish create_broadcast -> create consume_broadcast -> dropped (use wait_for_broadcast / try_next) publish_only -> scope consume_only -> scope (on OriginConsumer) announced -> next (returns OriginUpdate enum) try_announced -> try_next announced_broadcast -> wait_for_broadcast Active/Ended semantics, shortest-hop preference, is_clone dedup, and "newer wins ties" all preserved. Active selection still lives in the producer so relay forwarders don't have to reimplement it. Also exposes BroadcastConsumer::poll_closed and is_closed so callers that need to compose close-detection without spawning have a primitive. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
WalkthroughA comprehensive refactoring of the MoQ-Lite origin model replaces the tuple-based broadcast publishing and announcement mechanisms with a stream-based 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
rs/moq-ffi/src/origin.rs (1)
118-123:⚠️ Potential issue | 🟠 Major | ⚡ Quick winThis no longer waits for the exact broadcast path.
with_root(path)subscribes to the whole subtree underpath, andAnnounced::available()returns the firstActiveupdate it sees. That meansannounced_broadcast("foo")can resolve withfoo/barif the nested broadcast arrives first. Please drive this throughOriginConsumer::wait_for_broadcast(path)or store the requested path and filter for an exact match.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-ffi/src/origin.rs` around lines 118 - 123, The function announced_broadcast currently uses self.inner.clone().with_root(path) which subscribes to the whole subtree and returns the first Active update (via Announced::available), allowing nested paths (e.g., "foo/bar") to satisfy a request for "foo". Change announced_broadcast to obtain the exact broadcast origin for the requested path by calling the consumer method that waits for an exact match (OriginConsumer::wait_for_broadcast(path)) or, if you prefer keeping with_root, capture and store the requested path and filter Announced::available events until the update.path == requested_path; then construct the MoqAnnouncedBroadcast using Task::new(Announced { inner: exact_origin }) and return it as before.rs/moq-lite/src/lite/subscriber.rs (1)
223-232:⚠️ Potential issue | 🟠 Major | ⚡ Quick winHandle rejected origin publishes before registering the announce locally.
OriginProducer::publish()can returnfalsehere, but this path still keeps the entry inproducersand spawnsrun_broadcast. If the publish was rejected, the announce becomes locally “active” even though no origin consumer can ever see it, and a later real announce on the same path will trip the duplicate-path check. Roll back the inserted producer and skip spawning whenpublish()returnsfalse.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/lite/subscriber.rs` around lines 223 - 232, The code currently inserts the producer and then calls OriginProducer::publish(...); if publish returns false you must roll back that insertion and avoid spawning run_broadcast. Change the flow so after creating `dynamic = broadcast.dynamic()` you call `let published = self.origin.as_mut().unwrap().publish(path.clone(), broadcast.consume());` and if `published` is false remove the just-inserted producer from `self.producers` (or undo whatever created the local announce) and do not call `web_async::spawn(self.clone().run_broadcast(path, dynamic));` — only spawn run_broadcast when publish returns true. Ensure the rollback targets the same producer entry created earlier so duplicate-path checks remain correct.rs/moq-lite/src/ietf/subscriber.rs (1)
415-423:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDon't treat a rejected
publish()as a successful announce.This migration now calls
OriginProducer::publish(...), but the return value is ignored. If the origin rejects the publish,state.broadcastsstill records the path andrun_broadcaststill starts, so later announces on that path can be rejected as duplicates even though nothing was actually published. Please fail or roll back the entry whenpublish()returnsfalse.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 415 - 423, The Vacant-branch currently ignores OriginProducer::publish(...)’s boolean result so a rejected publish still inserts a BroadcastState and returns a Broadcast; update the Entry::Vacant handling (the block that creates Broadcast::new().produce(), calls origin.publish(path.clone(), ...), inserts BroadcastState and returns broadcast) to check the publish return value and abort/roll back on false: if publish(...) returns false, do not insert into the map (do not create or record BroadcastState in state.broadcasts), do not start run_broadcast for that path, and return an error or a None/appropriate failure value to the caller instead of the broadcast; ensure you reference OriginProducer::publish, BroadcastState, Entry::Vacant and run_broadcast when making the change so the insert and broadcast-start are skipped/undone on rejection.rs/moq-boy/src/input.rs (1)
72-97:⚠️ Potential issue | 🟠 Major | ⚡ Quick winAvoid emitting
ViewerLeftdirectly onOriginUpdate::Ended.This can produce duplicate/spurious offline events during path handoff (
Endedmay be immediately followed byActive), and the spawned command task already emitsViewerLeftwhen the stream actually ends.Suggested adjustment
- let (path, broadcast) = match update { - moq_lite::OriginUpdate::Active(p, b) => (p, Some(b)), - moq_lite::OriginUpdate::Ended(p) => (p, None), - }; + let (path, broadcast) = match update { + moq_lite::OriginUpdate::Active(p, b) => (p, b), + moq_lite::OriginUpdate::Ended(p) => { + tracing::debug!(viewer_id = %p, "viewer broadcast ended"); + continue; + } + }; let viewer_id = path.to_string(); - if let Some(broadcast) = broadcast { + { tracing::info!(%viewer_id, "viewer connected"); let cmd_tx = cmd_tx.clone(); let vid = viewer_id.clone(); tokio::spawn(async move { if let Err(e) = handle_viewer_commands(&vid, broadcast, &cmd_tx).await { tracing::warn!(viewer_id = %vid, error = %e, "viewer command error"); } tracing::info!(viewer_id = %vid, "viewer disconnected"); let _ = cmd_tx.send(Command::ViewerLeft { viewer_id: vid }).await; }); - } else { - tracing::info!(%viewer_id, "viewer went offline"); - let _ = cmd_tx - .send(Command::ViewerLeft { - viewer_id: viewer_id.clone(), - }) - .await; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-boy/src/input.rs` around lines 72 - 97, Don't send Command::ViewerLeft when matching moq_lite::OriginUpdate::Ended; that causes duplicate/offline events because the spawned task from the Active case already sends ViewerLeft when the stream closes. In the match/if block around OriginUpdate::Active/Ended, keep the existing tokio::spawn + handle_viewer_commands flow for Active (including the send in the spawned task), but for the Ended branch remove the cmd_tx.send(Command::ViewerLeft { .. }).await and only emit a log (tracing::info! or tracing::debug!) for viewer offline; this ensures ViewerLeft is only emitted by the spawned task handling the broadcast and prevents spurious duplicate events.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rs/libmoq/src/origin.rs`:
- Around line 113-126: The current scoped-consumer usage can return a descendant
(e.g., "foo/bar") first and incorrectly fail; after creating the consumer via
origin.consume().scope(&[path.as_path()]) you must drain the consumer (loop over
consumer.try_next()) until you find an OriginUpdate::Active where p.as_path() ==
path and then return Ok(b), or until try_next() returns None in which case
return Err(Error::BroadcastNotFound); keep the same origin.consume(),
scope(&[path.as_path()]), and matching on moq_lite::OriginUpdate::Active(p, b)
but iterate instead of a single try_next() call.
- Around line 135-137: The current wrapper in origin.rs calls
self.active.get_mut(origin) and then origin.publish(path, broadcast) but always
returns Ok(()), which masks rejections signaled by OriginProducer::publish()
returning false; update the code to check the boolean result of
OriginProducer::publish(path, broadcast) and propagate failure by returning an
Err variant (e.g., Err(Error::PublishRejected) or an appropriate existing Error)
when publish returns false, otherwise return Ok(()). Ensure you reference the
same origin variable from self.active.get_mut(origin) and preserve existing
error handling for get_mut (ok_or(Error::OriginNotFound)?).
---
Outside diff comments:
In `@rs/moq-boy/src/input.rs`:
- Around line 72-97: Don't send Command::ViewerLeft when matching
moq_lite::OriginUpdate::Ended; that causes duplicate/offline events because the
spawned task from the Active case already sends ViewerLeft when the stream
closes. In the match/if block around OriginUpdate::Active/Ended, keep the
existing tokio::spawn + handle_viewer_commands flow for Active (including the
send in the spawned task), but for the Ended branch remove the
cmd_tx.send(Command::ViewerLeft { .. }).await and only emit a log
(tracing::info! or tracing::debug!) for viewer offline; this ensures ViewerLeft
is only emitted by the spawned task handling the broadcast and prevents spurious
duplicate events.
In `@rs/moq-ffi/src/origin.rs`:
- Around line 118-123: The function announced_broadcast currently uses
self.inner.clone().with_root(path) which subscribes to the whole subtree and
returns the first Active update (via Announced::available), allowing nested
paths (e.g., "foo/bar") to satisfy a request for "foo". Change
announced_broadcast to obtain the exact broadcast origin for the requested path
by calling the consumer method that waits for an exact match
(OriginConsumer::wait_for_broadcast(path)) or, if you prefer keeping with_root,
capture and store the requested path and filter Announced::available events
until the update.path == requested_path; then construct the
MoqAnnouncedBroadcast using Task::new(Announced { inner: exact_origin }) and
return it as before.
In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 415-423: The Vacant-branch currently ignores
OriginProducer::publish(...)’s boolean result so a rejected publish still
inserts a BroadcastState and returns a Broadcast; update the Entry::Vacant
handling (the block that creates Broadcast::new().produce(), calls
origin.publish(path.clone(), ...), inserts BroadcastState and returns broadcast)
to check the publish return value and abort/roll back on false: if publish(...)
returns false, do not insert into the map (do not create or record
BroadcastState in state.broadcasts), do not start run_broadcast for that path,
and return an error or a None/appropriate failure value to the caller instead of
the broadcast; ensure you reference OriginProducer::publish, BroadcastState,
Entry::Vacant and run_broadcast when making the change so the insert and
broadcast-start are skipped/undone on rejection.
In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 223-232: The code currently inserts the producer and then calls
OriginProducer::publish(...); if publish returns false you must roll back that
insertion and avoid spawning run_broadcast. Change the flow so after creating
`dynamic = broadcast.dynamic()` you call `let published =
self.origin.as_mut().unwrap().publish(path.clone(), broadcast.consume());` and
if `published` is false remove the just-inserted producer from `self.producers`
(or undo whatever created the local announce) and do not call
`web_async::spawn(self.clone().run_broadcast(path, dynamic));` — only spawn
run_broadcast when publish returns true. Ensure the rollback targets the same
producer entry created earlier so duplicate-path checks remain correct.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3a30bf84-46cf-4da8-a44d-256ee6e5983f
📒 Files selected for processing (23)
rs/hang/examples/subscribe.rsrs/hang/examples/video.rsrs/libmoq/src/origin.rsrs/moq-boy/src/input.rsrs/moq-boy/src/main.rsrs/moq-cli/src/client.rsrs/moq-cli/src/server.rsrs/moq-clock/src/main.rsrs/moq-ffi/src/origin.rsrs/moq-gst/src/sink/imp.rsrs/moq-gst/src/source/imp.rsrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/ietf/subscriber.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/model/broadcast.rsrs/moq-lite/src/model/origin.rsrs/moq-native/examples/chat.rsrs/moq-native/tests/backend.rsrs/moq-native/tests/broadcast.rsrs/moq-relay/src/cluster.rsrs/moq-relay/src/connection.rsrs/moq-relay/src/web.rs
| use moq_lite::AsPath; | ||
| let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?; | ||
| // TODO: expose an async variant backed by `announced_broadcast` so FFI callers can wait | ||
| // for gossip instead of racing it. | ||
| #[allow(deprecated)] | ||
| origin.consume().consume_broadcast(path).ok_or(Error::BroadcastNotFound) | ||
| // TODO: expose an async variant so FFI callers can wait for gossip instead of racing it. | ||
| // Scope a fresh consumer to the requested path; the constructor's replay puts the | ||
| // currently-active broadcast (if any) at the head of the queue. | ||
| let path = path.as_path(); | ||
| let mut consumer = origin | ||
| .consume() | ||
| .scope(&[path.as_path()]) | ||
| .ok_or(Error::BroadcastNotFound)?; | ||
| match consumer.try_next() { | ||
| Some(moq_lite::OriginUpdate::Active(p, b)) if p.as_path() == path => Ok(b), | ||
| _ => Err(Error::BroadcastNotFound), | ||
| } |
There was a problem hiding this comment.
Scoped replay is not an exact-path lookup.
scope(&[path]) still includes descendants, and the replay queue is filled in HashMap iteration order. If both foo and foo/bar are active, try_next() can yield foo/bar first and this returns BroadcastNotFound even though foo exists. Drain the scoped queue until you either find Active(p == path) or exhaust it.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/libmoq/src/origin.rs` around lines 113 - 126, The current scoped-consumer
usage can return a descendant (e.g., "foo/bar") first and incorrectly fail;
after creating the consumer via origin.consume().scope(&[path.as_path()]) you
must drain the consumer (loop over consumer.try_next()) until you find an
OriginUpdate::Active where p.as_path() == path and then return Ok(b), or until
try_next() returns None in which case return Err(Error::BroadcastNotFound); keep
the same origin.consume(), scope(&[path.as_path()]), and matching on
moq_lite::OriginUpdate::Active(p, b) but iterate instead of a single try_next()
call.
| let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?; | ||
| origin.publish_broadcast(path, broadcast); | ||
| origin.publish(path, broadcast); | ||
| Ok(()) |
There was a problem hiding this comment.
Propagate failed publishes instead of always returning success.
OriginProducer::publish() now signals rejection with false, but this wrapper always returns Ok(()). That turns authorization/loop-detection failures into false success for the libmoq API.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/libmoq/src/origin.rs` around lines 135 - 137, The current wrapper in
origin.rs calls self.active.get_mut(origin) and then origin.publish(path,
broadcast) but always returns Ok(()), which masks rejections signaled by
OriginProducer::publish() returning false; update the code to check the boolean
result of OriginProducer::publish(path, broadcast) and propagate failure by
returning an Err variant (e.g., Err(Error::PublishRejected) or an appropriate
existing Error) when publish returns false, otherwise return Ok(()). Ensure you
reference the same origin variable from self.active.get_mut(origin) and preserve
existing error handling for get_mut (ok_or(Error::OriginNotFound)?).
Summary
Replaces the
OriginNode/NotifyNodetree, per-publishweb_async::spawncleanup, and per-consumermpscfan-out with a flatHashMap<PathOwned, Entry>behind aMutexplus per-consumer queues. Consumers register a singleconducer::Waiteron both the shared state and each tracked broadcast'spoll_closed, so a broadcast closing wakes its consumers directly — no spawned cleanup tasks, notokio::time::sleep(1ms)in tests, no 127-message tokio-mpsc bug.Active/Ended semantics, shortest-hop preference,
is_clonededup, and "newer wins ties" are all preserved. Active selection stays in the producer so relay forwarders don't have to reimplement the bookkeeping.API renames (one-liner migrations applied across every workspace caller):
publish_broadcastpublishcreate_broadcastcreateconsume_broadcastwait_for_broadcast/try_next)publish_onlyscopeconsume_onlyscope(onOriginConsumer)announcednext(returnsOriginUpdateenum)try_announcedtry_nextannounced_broadcastwait_for_broadcastAlso exposes
BroadcastConsumer::poll_closedandis_closedso callers that need to compose close-detection without spawning have a primitive.Test plan
cargo test -p moq-lite --lib model::origin— 34/34 origin tests passcargo test --workspace— all unit + integration tests passcargo build --workspacejust check🤖 Generated with Claude Code