-
Notifications
You must be signed in to change notification settings - Fork 20
feat(shared-runtime)!: use weak waker in trigger [APMSP-3371] #2050
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
VianneyRuhlmann
wants to merge
3
commits into
main
Choose a base branch
from
vianney/use-weak-waker
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+197
−26
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,181 @@ | ||
| // Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| //! # Goal | ||
| //! | ||
| //! This module solves a very specific problem. | ||
| //! | ||
| //! The problem happens when we use a queue, or any async message passing type in Rust, | ||
| //! where one end of the pipe is in a task spawned in an async runtime and the other end | ||
| //! is kept outside of the async runtime. | ||
| //! | ||
| //! When we `await` on the end of the pipe inside of the task, a waker is passed to the | ||
| //! `Future` object, and shared with the other end of the pipe outside of the async | ||
| //! runtime. | ||
| //! | ||
| //! The waker needs to keep a reference to the async runtime scheduler so that the user | ||
| //! outside of the task can notify the async scheduler to poll the task again. | ||
| //! | ||
| //! Whenever we want to drop the async runtime, if the task is suspended on one end of | ||
| //! the pipe, the task will be dropped but the waker has been shared between both ends | ||
| //! of the pipe and can keep the async runtime alive for longer than needed, preventing | ||
| //! resources from being freed. | ||
| //! | ||
| //! When a future that has been wrapped by [`wrap`] is `await`-ed the following | ||
| //! happens: | ||
| //! * the waker passed to the future is wrapped in a level of indirection, that allows dropping the | ||
| //! original waker without coordinating with the end of the pipe outside of the runtime. | ||
| //! * a reference to the wrapper waker is stored inside of the task, so that if the task is dropped, | ||
| //! we drop the original waker. | ||
| //! | ||
| //! When the async runtime holding the task is dropped, the task will be dropped which | ||
| //! will free the original waker, allowing the async runtime's scheduler to be dropped | ||
| //! too. | ||
|
|
||
| use std::future::Future; | ||
| use std::ops::Deref; | ||
| use std::pin::Pin; | ||
| use std::sync::Arc; | ||
| use std::task::Context; | ||
|
|
||
| use futures_util::task::{waker_ref, ArcWake, AtomicWaker, WakerRef}; | ||
|
|
||
| /// Wraps an [`AtomicWaker`] to create our own waker. | ||
| /// | ||
| /// [`AtomicWaker`] is in essence an `Option<Waker>`, which allows us to drop the | ||
| /// reference to the original whenever the task that needs to be woken is dropped. | ||
| struct WeakWakerInner { | ||
| waker: AtomicWaker, | ||
| } | ||
|
|
||
| impl ArcWake for WeakWakerInner { | ||
| fn wake_by_ref(arc_self: &Arc<Self>) { | ||
| arc_self.waker.wake(); | ||
| } | ||
| } | ||
|
|
||
| fn make_waker(waker: &Arc<WeakWakerInner>) -> WakerRef<'_> { | ||
| waker_ref(waker) | ||
| } | ||
|
|
||
| struct WeakWaker { | ||
| inner: Arc<WeakWakerInner>, | ||
| } | ||
|
|
||
| impl Drop for WeakWaker { | ||
| fn drop(&mut self) { | ||
| // Drop the stored reference to the original waker so that resources held by | ||
| // the original waker (e.g. the async runtime scheduler) can be released. | ||
| self.inner.waker.take(); | ||
| } | ||
| } | ||
|
|
||
| /// Wrap a future so that the waker passed to it is held only weakly. | ||
| /// | ||
| /// See the [module-level documentation](self) for details. | ||
| #[must_use = "futures do nothing unless you `.await` or poll them"] | ||
| pub fn wrap<F: Future>(fut: F) -> WeakWakerFuture<F> { | ||
| WeakWakerFuture { | ||
| fut, | ||
| weak_waker: None, | ||
| } | ||
| } | ||
|
|
||
| pub struct WeakWakerFuture<F: Future> { | ||
| fut: F, | ||
| weak_waker: Option<WeakWaker>, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naive question: isn't there a double |
||
| } | ||
|
|
||
| impl<F: Future> Future for WeakWakerFuture<F> { | ||
| type Output = F::Output; | ||
|
|
||
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { | ||
| // SAFETY: | ||
| // Neither `weak_waker` nor `fut` are going to be moved out of `self`. | ||
| let m = unsafe { self.get_unchecked_mut() }; | ||
|
|
||
| // On the first poll, allocate the WeakWakerInner Arc. | ||
| // On subsequent polls, reuse it and update the stored waker in-place via | ||
| // AtomicWaker::register — no heap allocation. | ||
| let inner = if let Some(ref ww) = m.weak_waker { | ||
| ww.inner.waker.register(cx.waker()); | ||
| &ww.inner | ||
| } else { | ||
| let w = AtomicWaker::new(); | ||
| w.register(cx.waker()); | ||
| &m.weak_waker | ||
| .insert(WeakWaker { | ||
| inner: Arc::new(WeakWakerInner { waker: w }), | ||
| }) | ||
| .inner | ||
| }; | ||
|
|
||
| // SAFETY: structural pinning for `fut`. The shared borrow of `m.weak_waker` | ||
| // and the mutable borrow of `m.fut` are on disjoint fields; NLL allows this. | ||
| unsafe { | ||
| Pin::new_unchecked(&mut m.fut).poll(&mut Context::from_waker(make_waker(inner).deref())) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use std::future::Future; | ||
| use std::pin::Pin; | ||
| use std::sync::atomic::{AtomicBool, Ordering}; | ||
| use std::sync::Arc; | ||
| use std::task::Context; | ||
|
|
||
| use futures::task::waker; | ||
| use futures_util::task::ArcWake; | ||
|
|
||
| struct TestWaker { | ||
| waked: AtomicBool, | ||
| } | ||
|
|
||
| impl ArcWake for TestWaker { | ||
| fn wake_by_ref(s: &Arc<Self>) { | ||
| s.waked.store(true, Ordering::SeqCst); | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_mpsc_queue_weak_waiter_drop_correctly() { | ||
| let (tx, mut rx) = futures::channel::mpsc::unbounded::<()>(); | ||
|
|
||
| let mut fut = super::wrap(futures::StreamExt::next(&mut rx)); | ||
| let pinned_fut = Pin::new(&mut fut); | ||
| let base_waker = Arc::new(TestWaker { | ||
| waked: AtomicBool::new(false), | ||
| }); | ||
| assert!(pinned_fut | ||
| .poll(&mut Context::from_waker(&waker(base_waker.clone()))) | ||
| .is_pending()); | ||
| assert_eq!(Arc::strong_count(&base_waker), 2); | ||
| drop(fut); | ||
| assert!(!base_waker.waked.load(Ordering::SeqCst)); | ||
| assert_eq!(Arc::strong_count(&base_waker), 1); | ||
| tx.unbounded_send(()).unwrap(); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_mpsc_queue_weak_waiter_smoke() { | ||
| let (tx, mut rx) = futures::channel::mpsc::unbounded::<()>(); | ||
|
|
||
| let mut fut = super::wrap(futures::StreamExt::next(&mut rx)); | ||
| let mut pinned_fut = Pin::new(&mut fut); | ||
| let base_waker = Arc::new(TestWaker { | ||
| waked: AtomicBool::new(false), | ||
| }); | ||
| assert!(pinned_fut | ||
| .as_mut() | ||
| .poll(&mut Context::from_waker(&waker(base_waker.clone()))) | ||
| .is_pending()); | ||
| assert_eq!(Arc::strong_count(&base_waker), 2); | ||
| tx.unbounded_send(()).unwrap(); | ||
| assert!(base_waker.waked.load(Ordering::SeqCst)); | ||
| assert!(pinned_fut | ||
| .poll(&mut Context::from_waker(&waker(base_waker.clone()))) | ||
| .is_ready()); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: should this be
WeakWakerFuture::new, or at leastWeakWakerFuture::wrapinstead?