From 0c684330c6b58abda7dd2725ef85de1faaa32bfb Mon Sep 17 00:00:00 2001 From: vianney Date: Thu, 28 May 2026 14:41:09 +0200 Subject: [PATCH 1/2] chore(shared-runtime): add weak waker to shared-runtime Co-authored-by: paullegranddc <82819397+paullegranddc@users.noreply.github.com> --- libdd-shared-runtime/Cargo.toml | 6 +- libdd-shared-runtime/src/lib.rs | 1 + .../src/shared_runtime/pausable_worker.rs | 12 +- libdd-shared-runtime/src/weak_waker.rs | 181 ++++++++++++++++++ 4 files changed, 196 insertions(+), 4 deletions(-) create mode 100644 libdd-shared-runtime/src/weak_waker.rs diff --git a/libdd-shared-runtime/Cargo.toml b/libdd-shared-runtime/Cargo.toml index 0fd91df2f8..9f310d7395 100644 --- a/libdd-shared-runtime/Cargo.toml +++ b/libdd-shared-runtime/Cargo.toml @@ -18,6 +18,7 @@ bench = false [dependencies] async-trait = "0.1" futures = { version = "0.3", default-features = false, features = ["alloc"] } +futures-util = { version = "0.3", default-features = false, features = ["alloc"] } tokio = { version = "1.23", features = ["rt", "macros", "time"] } tokio-util = "0.7.11" tracing = { version = "0.1", default-features = false } @@ -36,4 +37,7 @@ tokio = { version = "1.23", features = ["rt-multi-thread"] } [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" -futures-util = { version = "0.3", default-features = false, features = ["channel"] } +futures-util = { version = "0.3", default-features = false, features = ["alloc", "channel"] } + +[dev-dependencies] +futures = { version = "0.3", features = ["executor"] } diff --git a/libdd-shared-runtime/src/lib.rs b/libdd-shared-runtime/src/lib.rs index e9dc0ee642..0e813e26d6 100644 --- a/libdd-shared-runtime/src/lib.rs +++ b/libdd-shared-runtime/src/lib.rs @@ -17,6 +17,7 @@ //! deadlocks in child processes. pub mod shared_runtime; +mod weak_waker; pub mod worker; // Top-level re-exports for convenience diff --git a/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs b/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs index 7610fa7725..139ae1c94a 100644 --- a/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs +++ b/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs @@ -3,6 +3,7 @@ //! Defines a pausable worker to be able to stop background processes before forks +use crate::weak_waker; use crate::worker::Worker; use core::pin::Pin; use libdd_capabilities::spawn::SpawnError; @@ -116,13 +117,18 @@ impl PausableWorker { let stop_token = CancellationToken::new(); let cloned_token = stop_token.clone(); let future = Box::pin(async move { - // First iteration using initial_trigger + // First iteration using initial_trigger. + // + // `trigger`/`initial_trigger` are wrapped with [`weak_waker::wrap`] so the + // waker handed out to the worker (and potentially shared with code + // outside of this runtime, e.g. the non-runtime end of a channel) does + // not keep the runtime scheduler alive after this task is dropped. select! { biased; _ = cloned_token.cancelled() => { return worker; } - _ = worker.initial_trigger() => { + _ = weak_waker::wrap(worker.initial_trigger()) => { worker.run().await; } } @@ -134,7 +140,7 @@ impl PausableWorker { _ = cloned_token.cancelled() => { break; } - _ = worker.trigger() => { + _ = weak_waker::wrap(worker.trigger()) => { worker.run().await; } } diff --git a/libdd-shared-runtime/src/weak_waker.rs b/libdd-shared-runtime/src/weak_waker.rs new file mode 100644 index 0000000000..c9e922daa5 --- /dev/null +++ b/libdd-shared-runtime/src/weak_waker.rs @@ -0,0 +1,181 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! # Goal +//! +//! This module solves a very specific problem. +//! +//! The problem happens when we use a queue, or any async message passing type in Rust, +//! where one end of the pipe is in a task spawned in an async runtime and the other end +//! is kept outside of the async runtime. +//! +//! When we `await` on the end of the pipe inside of the task, a waker is passed to the +//! `Future` object, and shared with the other end of the pipe outside of the async +//! runtime. +//! +//! The waker needs to keep a reference to the async runtime scheduler so that the user +//! outside of the task can notify the async scheduler to poll the task again. +//! +//! Whenever we want to drop the async runtime, if the task is suspended on one end of +//! the pipe, the task will be dropped but the waker has been shared between both ends +//! of the pipe and can keep the async runtime alive for longer than needed, preventing +//! resources from being freed. +//! +//! When a future that has been wrapped by [`wrap`] is `await`-ed the following +//! happens: +//! * the waker passed to the future is wrapped in a level of indirection, that allows dropping the +//! original waker without coordinating with the end of the pipe outside of the runtime. +//! * a reference to the wrapper waker is stored inside of the task, so that if the task is dropped, +//! we drop the original waker. +//! +//! When the async runtime holding the task is dropped, the task will be dropped which +//! will free the original waker, allowing the async runtime's scheduler to be dropped +//! too. + +use std::future::Future; +use std::ops::Deref; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; + +use futures_util::task::{waker_ref, ArcWake, AtomicWaker, WakerRef}; + +/// Wraps an [`AtomicWaker`] to create our own waker. +/// +/// [`AtomicWaker`] is in essence an `Option`, which allows us to drop the +/// reference to the original whenever the task that needs to be woken is dropped. +struct WeakWakerInner { + waker: AtomicWaker, +} + +impl ArcWake for WeakWakerInner { + fn wake_by_ref(arc_self: &Arc) { + arc_self.waker.wake(); + } +} + +fn make_waker(waker: &Arc) -> WakerRef<'_> { + waker_ref(waker) +} + +struct WeakWaker { + inner: Arc, +} + +impl Drop for WeakWaker { + fn drop(&mut self) { + // Drop the stored reference to the original waker so that resources held by + // the original waker (e.g. the async runtime scheduler) can be released. + self.inner.waker.take(); + } +} + +/// Wrap a future so that the waker passed to it is held only weakly. +/// +/// See the [module-level documentation](self) for details. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub fn wrap(fut: F) -> WeakWakerFuture { + WeakWakerFuture { + fut, + weak_waker: None, + } +} + +pub struct WeakWakerFuture { + fut: F, + weak_waker: Option, +} + +impl Future for WeakWakerFuture { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { + // SAFETY: + // Neither `weak_waker` nor `fut` are going to be moved out of `self`. + let m = unsafe { self.get_unchecked_mut() }; + + // On the first poll, allocate the WeakWakerInner Arc. + // On subsequent polls, reuse it and update the stored waker in-place via + // AtomicWaker::register — no heap allocation. + let inner = if let Some(ref ww) = m.weak_waker { + ww.inner.waker.register(cx.waker()); + &ww.inner + } else { + let w = AtomicWaker::new(); + w.register(cx.waker()); + &m.weak_waker + .insert(WeakWaker { + inner: Arc::new(WeakWakerInner { waker: w }), + }) + .inner + }; + + // SAFETY: structural pinning for `fut`. The shared borrow of `m.weak_waker` + // and the mutable borrow of `m.fut` are on disjoint fields; NLL allows this. + unsafe { + Pin::new_unchecked(&mut m.fut).poll(&mut Context::from_waker(make_waker(inner).deref())) + } + } +} + +#[cfg(test)] +mod tests { + use std::future::Future; + use std::pin::Pin; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + use std::task::Context; + + use futures::task::waker; + use futures_util::task::ArcWake; + + struct TestWaker { + waked: AtomicBool, + } + + impl ArcWake for TestWaker { + fn wake_by_ref(s: &Arc) { + s.waked.store(true, Ordering::SeqCst); + } + } + + #[test] + fn test_mpsc_queue_weak_waiter_drop_correctly() { + let (tx, mut rx) = futures::channel::mpsc::unbounded::<()>(); + + let mut fut = super::wrap(futures::StreamExt::next(&mut rx)); + let pinned_fut = Pin::new(&mut fut); + let base_waker = Arc::new(TestWaker { + waked: AtomicBool::new(false), + }); + assert!(pinned_fut + .poll(&mut Context::from_waker(&waker(base_waker.clone()))) + .is_pending()); + assert_eq!(Arc::strong_count(&base_waker), 2); + drop(fut); + assert!(!base_waker.waked.load(Ordering::SeqCst)); + assert_eq!(Arc::strong_count(&base_waker), 1); + tx.unbounded_send(()).unwrap(); + } + + #[test] + fn test_mpsc_queue_weak_waiter_smoke() { + let (tx, mut rx) = futures::channel::mpsc::unbounded::<()>(); + + let mut fut = super::wrap(futures::StreamExt::next(&mut rx)); + let mut pinned_fut = Pin::new(&mut fut); + let base_waker = Arc::new(TestWaker { + waked: AtomicBool::new(false), + }); + assert!(pinned_fut + .as_mut() + .poll(&mut Context::from_waker(&waker(base_waker.clone()))) + .is_pending()); + assert_eq!(Arc::strong_count(&base_waker), 2); + tx.unbounded_send(()).unwrap(); + assert!(base_waker.waked.load(Ordering::SeqCst)); + assert!(pinned_fut + .poll(&mut Context::from_waker(&waker(base_waker.clone()))) + .is_ready()); + } +} From 28fc7cab98a34217dc580527fb6583d57efd9ddd Mon Sep 17 00:00:00 2001 From: vianney Date: Thu, 28 May 2026 14:52:55 +0200 Subject: [PATCH 2/2] feat(worker)!: remove worker on_pause --- libdd-data-pipeline/src/agent_info/fetcher.rs | 12 ------------ .../src/shared_runtime/pausable_worker.rs | 3 +-- libdd-shared-runtime/src/worker.rs | 8 -------- 3 files changed, 1 insertion(+), 22 deletions(-) diff --git a/libdd-data-pipeline/src/agent_info/fetcher.rs b/libdd-data-pipeline/src/agent_info/fetcher.rs index 131e4c04fc..29326ce11f 100644 --- a/libdd-data-pipeline/src/agent_info/fetcher.rs +++ b/libdd-data-pipeline/src/agent_info/fetcher.rs @@ -195,7 +195,6 @@ pub struct AgentInfoFetcher { info_endpoint: Endpoint, refresh_interval: Duration, trigger_rx: Option>, - trigger_tx: mpsc::Sender<()>, /// `C` lives on the struct because `Worker::run(&mut self)` (a fixed trait /// signature) calls `fetch_info_with_state::()` internally. _phantom: PhantomData, @@ -216,7 +215,6 @@ impl AgentInfoFetcher { info_endpoint, refresh_interval, trigger_rx: Some(trigger_rx), - trigger_tx: trigger_tx.clone(), _phantom: PhantomData, }; @@ -271,16 +269,6 @@ impl Wor } } - async fn on_pause(&mut self) { - // Release the IoStack waker stored in trigger_rx by waking the channel and drain the - // message to avoid a spurious fetch on restart. If the channel is not empty then it has - // already been waked. - if self.trigger_rx.as_ref().is_some_and(|rx| rx.is_empty()) { - let _ = self.trigger_tx.try_send(()); - self.drain(); - }; - } - async fn run(&mut self) { self.fetch_and_update().await; } diff --git a/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs b/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs index 139ae1c94a..976aad4db6 100644 --- a/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs +++ b/libdd-shared-runtime/src/shared_runtime/pausable_worker.rs @@ -176,9 +176,8 @@ impl PausableWorker { stop_token.cancel(); } - if let Ok(mut worker) = handle.await { + if let Ok(worker) = handle.await { debug!(?worker, "Worker paused successfully"); - worker.on_pause().await; *self = PausableWorker::Paused { worker }; Ok(()) } else { diff --git a/libdd-shared-runtime/src/worker.rs b/libdd-shared-runtime/src/worker.rs index 9d76ab8374..4666ddbf2d 100644 --- a/libdd-shared-runtime/src/worker.rs +++ b/libdd-shared-runtime/src/worker.rs @@ -38,10 +38,6 @@ pub trait Worker: std::fmt::Debug + MaybeSend { /// Reset the worker state. Called in the child after a fork to cleanup parent state. fn reset(&mut self) {} - /// Hook called after the worker has been paused (e.g. before a fork). - /// Default is a no-op. - async fn on_pause(&mut self) {} - /// Hook called when the app is shutting down. Can be used to flush remaining data. async fn shutdown(&mut self) {} } @@ -66,10 +62,6 @@ impl Worker for Box { (**self).reset() } - async fn on_pause(&mut self) { - (**self).on_pause().await - } - async fn shutdown(&mut self) { (**self).shutdown().await }