Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions libdd-data-pipeline/src/agent_info/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ pub struct AgentInfoFetcher<C: HttpClientCapability + SleepCapability> {
info_endpoint: Endpoint,
refresh_interval: Duration,
trigger_rx: Option<mpsc::Receiver<()>>,
trigger_tx: mpsc::Sender<()>,
/// `C` lives on the struct because `Worker::run(&mut self)` (a fixed trait
/// signature) calls `fetch_info_with_state::<C>()` internally.
_phantom: PhantomData<C>,
Expand All @@ -216,7 +215,6 @@ impl<C: HttpClientCapability + SleepCapability> AgentInfoFetcher<C> {
info_endpoint,
refresh_interval,
trigger_rx: Some(trigger_rx),
trigger_tx: trigger_tx.clone(),
_phantom: PhantomData,
};

Expand Down Expand Up @@ -271,16 +269,6 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Wor
}
}

async fn on_pause(&mut self) {
// Release the IoStack waker stored in trigger_rx by waking the channel and drain the
// message to avoid a spurious fetch on restart. If the channel is not empty then it has
// already been waked.
if self.trigger_rx.as_ref().is_some_and(|rx| rx.is_empty()) {
let _ = self.trigger_tx.try_send(());
self.drain();
};
}

async fn run(&mut self) {
self.fetch_and_update().await;
}
Expand Down
6 changes: 5 additions & 1 deletion libdd-shared-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ bench = false
[dependencies]
async-trait = "0.1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
tokio = { version = "1.23", features = ["rt", "macros", "time"] }
tokio-util = "0.7.11"
tracing = { version = "0.1", default-features = false }
Expand All @@ -36,4 +37,7 @@ tokio = { version = "1.23", features = ["rt-multi-thread"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4"
futures-util = { version = "0.3", default-features = false, features = ["channel"] }
futures-util = { version = "0.3", default-features = false, features = ["alloc", "channel"] }

[dev-dependencies]
futures = { version = "0.3", features = ["executor"] }
1 change: 1 addition & 0 deletions libdd-shared-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! deadlocks in child processes.

pub mod shared_runtime;
mod weak_waker;
pub mod worker;

// Top-level re-exports for convenience
Expand Down
15 changes: 10 additions & 5 deletions libdd-shared-runtime/src/shared_runtime/pausable_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

//! Defines a pausable worker to be able to stop background processes before forks

use crate::weak_waker;
use crate::worker::Worker;
use core::pin::Pin;
use libdd_capabilities::spawn::SpawnError;
Expand Down Expand Up @@ -116,13 +117,18 @@ impl<T: Worker + MaybeSend + Sync + 'static> PausableWorker<T> {
let stop_token = CancellationToken::new();
let cloned_token = stop_token.clone();
let future = Box::pin(async move {
// First iteration using initial_trigger
// First iteration using initial_trigger.
//
// `trigger`/`initial_trigger` are wrapped with [`weak_waker::wrap`] so the
// waker handed out to the worker (and potentially shared with code
// outside of this runtime, e.g. the non-runtime end of a channel) does
// not keep the runtime scheduler alive after this task is dropped.
select! {
biased;
_ = cloned_token.cancelled() => {
return worker;
}
_ = worker.initial_trigger() => {
_ = weak_waker::wrap(worker.initial_trigger()) => {
worker.run().await;
}
}
Expand All @@ -134,7 +140,7 @@ impl<T: Worker + MaybeSend + Sync + 'static> PausableWorker<T> {
_ = cloned_token.cancelled() => {
break;
}
_ = worker.trigger() => {
_ = weak_waker::wrap(worker.trigger()) => {
worker.run().await;
}
}
Expand Down Expand Up @@ -170,9 +176,8 @@ impl<T: Worker + MaybeSend + Sync + 'static> PausableWorker<T> {
stop_token.cancel();
}

if let Ok(mut worker) = handle.await {
if let Ok(worker) = handle.await {
debug!(?worker, "Worker paused successfully");
worker.on_pause().await;
*self = PausableWorker::Paused { worker };
Ok(())
} else {
Expand Down
181 changes: 181 additions & 0 deletions libdd-shared-runtime/src/weak_waker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! # Goal
//!
//! This module solves a very specific problem.
//!
//! The problem happens when we use a queue, or any async message passing type in Rust,
//! where one end of the pipe is in a task spawned in an async runtime and the other end
//! is kept outside of the async runtime.
//!
//! When we `await` on the end of the pipe inside of the task, a waker is passed to the
//! `Future` object, and shared with the other end of the pipe outside of the async
//! runtime.
//!
//! The waker needs to keep a reference to the async runtime scheduler so that the user
//! outside of the task can notify the async scheduler to poll the task again.
//!
//! Whenever we want to drop the async runtime, if the task is suspended on one end of
//! the pipe, the task will be dropped but the waker has been shared between both ends
//! of the pipe and can keep the async runtime alive for longer than needed, preventing
//! resources from being freed.
//!
//! When a future that has been wrapped by [`wrap`] is `await`-ed the following
//! happens:
//! * the waker passed to the future is wrapped in a level of indirection, that allows dropping the
//! original waker without coordinating with the end of the pipe outside of the runtime.
//! * a reference to the wrapper waker is stored inside of the task, so that if the task is dropped,
//! we drop the original waker.
//!
//! When the async runtime holding the task is dropped, the task will be dropped which
//! will free the original waker, allowing the async runtime's scheduler to be dropped
//! too.

use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;

use futures_util::task::{waker_ref, ArcWake, AtomicWaker, WakerRef};

/// Wraps an [`AtomicWaker`] to create our own waker.
///
/// [`AtomicWaker`] is in essence an `Option<Waker>`, which allows us to drop the
/// reference to the original whenever the task that needs to be woken is dropped.
struct WeakWakerInner {
waker: AtomicWaker,
}

impl ArcWake for WeakWakerInner {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.waker.wake();
}
}

fn make_waker(waker: &Arc<WeakWakerInner>) -> WakerRef<'_> {
waker_ref(waker)
}

struct WeakWaker {
inner: Arc<WeakWakerInner>,
}

impl Drop for WeakWaker {
fn drop(&mut self) {
// Drop the stored reference to the original waker so that resources held by
// the original waker (e.g. the async runtime scheduler) can be released.
self.inner.waker.take();
}
}

/// Wrap a future so that the waker passed to it is held only weakly.
///
/// See the [module-level documentation](self) for details.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub fn wrap<F: Future>(fut: F) -> WeakWakerFuture<F> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should this be WeakWakerFuture::new, or at least WeakWakerFuture::wrap instead?

WeakWakerFuture {
fut,
weak_waker: None,
}
}

pub struct WeakWakerFuture<F: Future> {
fut: F,
weak_waker: Option<WeakWaker>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naive question: isn't there a double Option wrapping here? If AtomicWaker is already morally an Option<Waker>, could we just use WeakWaker here and put "none" in the atomic waker?

}

impl<F: Future> Future for WeakWakerFuture<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
// SAFETY:
// Neither `weak_waker` nor `fut` are going to be moved out of `self`.
let m = unsafe { self.get_unchecked_mut() };

// On the first poll, allocate the WeakWakerInner Arc.
// On subsequent polls, reuse it and update the stored waker in-place via
// AtomicWaker::register — no heap allocation.
let inner = if let Some(ref ww) = m.weak_waker {
ww.inner.waker.register(cx.waker());
&ww.inner
} else {
let w = AtomicWaker::new();
w.register(cx.waker());
&m.weak_waker
.insert(WeakWaker {
inner: Arc::new(WeakWakerInner { waker: w }),
})
.inner
};

// SAFETY: structural pinning for `fut`. The shared borrow of `m.weak_waker`
// and the mutable borrow of `m.fut` are on disjoint fields; NLL allows this.
unsafe {
Pin::new_unchecked(&mut m.fut).poll(&mut Context::from_waker(make_waker(inner).deref()))
}
}
}

#[cfg(test)]
mod tests {
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::Context;

use futures::task::waker;
use futures_util::task::ArcWake;

struct TestWaker {
waked: AtomicBool,
}

impl ArcWake for TestWaker {
fn wake_by_ref(s: &Arc<Self>) {
s.waked.store(true, Ordering::SeqCst);
}
}

#[test]
fn test_mpsc_queue_weak_waiter_drop_correctly() {
let (tx, mut rx) = futures::channel::mpsc::unbounded::<()>();

let mut fut = super::wrap(futures::StreamExt::next(&mut rx));
let pinned_fut = Pin::new(&mut fut);
let base_waker = Arc::new(TestWaker {
waked: AtomicBool::new(false),
});
assert!(pinned_fut
.poll(&mut Context::from_waker(&waker(base_waker.clone())))
.is_pending());
assert_eq!(Arc::strong_count(&base_waker), 2);
drop(fut);
assert!(!base_waker.waked.load(Ordering::SeqCst));
assert_eq!(Arc::strong_count(&base_waker), 1);
tx.unbounded_send(()).unwrap();
}

#[test]
fn test_mpsc_queue_weak_waiter_smoke() {
let (tx, mut rx) = futures::channel::mpsc::unbounded::<()>();

let mut fut = super::wrap(futures::StreamExt::next(&mut rx));
let mut pinned_fut = Pin::new(&mut fut);
let base_waker = Arc::new(TestWaker {
waked: AtomicBool::new(false),
});
assert!(pinned_fut
.as_mut()
.poll(&mut Context::from_waker(&waker(base_waker.clone())))
.is_pending());
assert_eq!(Arc::strong_count(&base_waker), 2);
tx.unbounded_send(()).unwrap();
assert!(base_waker.waked.load(Ordering::SeqCst));
assert!(pinned_fut
.poll(&mut Context::from_waker(&waker(base_waker.clone())))
.is_ready());
}
}
8 changes: 0 additions & 8 deletions libdd-shared-runtime/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ pub trait Worker: std::fmt::Debug + MaybeSend {
/// Reset the worker state. Called in the child after a fork to cleanup parent state.
fn reset(&mut self) {}

/// Hook called after the worker has been paused (e.g. before a fork).
/// Default is a no-op.
async fn on_pause(&mut self) {}

/// Hook called when the app is shutting down. Can be used to flush remaining data.
async fn shutdown(&mut self) {}
}
Expand All @@ -66,10 +62,6 @@ impl Worker for Box<dyn Worker + Sync> {
(**self).reset()
}

async fn on_pause(&mut self) {
(**self).on_pause().await
}

async fn shutdown(&mut self) {
(**self).shutdown().await
}
Expand Down
Loading