Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions libdd-shared-runtime-ffi/src/shared_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ pub enum SharedRuntimeErrorCode {
RuntimeCreation,
/// Shutdown timed out.
ShutdownTimedOut,
/// Borrowed mode: shutdown has been triggered, so new workers cannot be spawned.
RuntimeShuttingDown,
/// Borrowed mode: the operation would deadlock against the host runtime
/// (e.g. waiting on shutdown from inside a current-thread runtime).
WouldDeadlock,
/// An unexpected panic occurred inside the FFI call.
#[cfg(feature = "catch_panic")]
Panic,
Expand Down Expand Up @@ -54,6 +59,8 @@ impl From<SharedRuntimeError> for SharedRuntimeFFIError {
SharedRuntimeError::WorkerError(_) => SharedRuntimeErrorCode::WorkerError,
SharedRuntimeError::RuntimeCreation(_) => SharedRuntimeErrorCode::RuntimeCreation,
SharedRuntimeError::ShutdownTimedOut(_) => SharedRuntimeErrorCode::ShutdownTimedOut,
SharedRuntimeError::RuntimeShuttingDown => SharedRuntimeErrorCode::RuntimeShuttingDown,
SharedRuntimeError::WouldDeadlock => SharedRuntimeErrorCode::WouldDeadlock,
};
SharedRuntimeFFIError::new(code, &err.to_string())
}
Expand Down
15 changes: 9 additions & 6 deletions libdd-shared-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@

//! A shared tokio runtime for running background workers across multiple components.
//!
//! This crate provides [`SharedRuntime`], which owns a single tokio runtime and manages
//! [`PausableWorker`]s on it. Components such as the trace exporter can share one runtime
//! instead of each creating their own, reducing thread and resource overhead.
//! Two runtime modes manage background workers; capabilities are enforced at compile time:
//!
//! [`SharedRuntime`] also provides fork-safety hooks (`before_fork`, `after_fork_parent`,
//! `after_fork_child`) that pause and restart workers around `fork()` calls, preventing
//! deadlocks in child processes.
//! - [`SharedRuntime`] (**owned mode**): owns a tokio runtime shared across components to reduce
//! thread overhead. Provides fork-safety hooks and synchronous `block_on`/`shutdown`.
//! - [`BorrowedRuntime`] (**borrowed mode**, native only): runs workers on a caller-owned runtime
//! via [`tokio::runtime::Handle`]. No fork hooks or synchronous `block_on`/`shutdown` — those
//! would touch a runtime we don't own. Workers it spawned can be stopped async or via a
//! `Condvar`-based path safe to call from inside the host runtime.

pub mod shared_runtime;
pub mod worker;

// Top-level re-exports for convenience
#[cfg(not(target_arch = "wasm32"))]
pub use shared_runtime::BorrowedRuntime;
pub use shared_runtime::{SharedRuntime, SharedRuntimeError, WorkerHandle, WorkerHandleError};
pub use worker::Worker;
259 changes: 259 additions & 0 deletions libdd-shared-runtime/src/shared_runtime/borrowed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Borrowed-mode runtime.
//!
//! [`BorrowedRuntime`] runs workers on a caller-owned tokio runtime (via [`Handle`]), avoiding a
//! second tokio runtime in the process ("tokio-inside-tokio" panics).
//!
//! Fork hooks, synchronous `block_on`, and runtime shutdown are absent — those would touch a
//! runtime or tasks we don't own. Workers *we* spawned can still be stopped via
//! [`BorrowedRuntime::shutdown_async`] or the non-blocking
//! [`trigger_shutdown_signal`](BorrowedRuntime::trigger_shutdown_signal) +
//! [`wait_shutdown_done`](BorrowedRuntime::wait_shutdown_done) pair.
//!
//! [`wait_shutdown_done`](BorrowedRuntime::wait_shutdown_done) must be called from a thread that
//! is **not** an executor thread of the host runtime: it parks the calling thread on a condvar
//! while the shutdown tasks run on the host runtime's executor threads. Calling it from an
//! executor thread starves those tasks. A current-thread runtime is detected and rejected
//! eagerly; a multi-thread runtime configured with a single worker thread produces the same
//! deadlock but cannot be detected through the tokio `Handle` API — that case is a caller
//! responsibility.
//!
//! Shutdown is terminal: once triggered, [`spawn_worker`](BorrowedRuntime::spawn_worker) returns
//! [`SharedRuntimeError::RuntimeShuttingDown`]; create a new instance to resume.

use super::pausable_worker::{self, PausableWorker};
use super::{BoxedWorker, SharedRuntimeError, WorkerEntry, WorkerHandle, WorkerRegistry};
use crate::worker::Worker;
use libdd_common::MutexExt;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use tokio::runtime::{Handle, RuntimeFlavor};
use tracing::{debug, error};

/// A runtime that runs workers on a caller-owned tokio runtime via its [`Handle`].
///
/// See the [module docs](self) for the differences versus the owned
/// [`SharedRuntime`](super::SharedRuntime).
#[derive(Debug)]
pub struct BorrowedRuntime {
handle: Handle,
registry: WorkerRegistry,
shutdown_tracker: Arc<ShutdownTracker>,
}

impl BorrowedRuntime {
/// Runs workers on `handle`'s runtime without taking ownership of it.
pub fn from_handle(handle: Handle) -> Self {
Self {
handle,
registry: WorkerRegistry::new(),
shutdown_tracker: Arc::new(ShutdownTracker::default()),
}
}

/// Returns a clone of the caller-supplied tokio runtime handle.
pub fn runtime_handle(&self) -> Handle {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most Handle methods take &self. Any reason to do a preventive clone here, instead of having a getter and letting the caller decide to clone if needed?

self.handle.clone()
}

/// Spawn a [`Worker`] on the borrowed runtime.
///
/// `restart_on_fork` is accepted for API symmetry but ignored: borrowed mode has no fork hooks.
///
/// # Errors
/// Returns [`SharedRuntimeError::RuntimeShuttingDown`] after shutdown; other errors if the
/// worker fails to start.
pub fn spawn_worker<T: Worker + Sync + 'static>(
&self,
worker: T,
restart_on_fork: bool,
) -> Result<WorkerHandle, SharedRuntimeError> {
// Lock order: state → workers (matches both shutdown paths), so checking `triggered`
// under state then acquiring workers is race-free with concurrent triggers.
let state = self.shutdown_tracker.state.lock().map_err(|e| {
SharedRuntimeError::LockFailed(format!("shutdown tracker state mutex poisoned: {e}"))
})?;
if state.triggered {
return Err(SharedRuntimeError::RuntimeShuttingDown);
}
let mut workers_guard = self.registry.workers.lock_or_panic();
drop(state); // safe: a concurrent trigger still needs workers lock we hold

let boxed_worker: BoxedWorker = Box::new(worker);
debug!(?boxed_worker, "Spawning worker on BorrowedRuntime");
let mut pausable_worker = PausableWorker::new(boxed_worker);
pausable_worker.start(pausable_worker::tokio_spawn_fn(&self.handle))?;
let worker_id = self.registry.next_id();
workers_guard.push(WorkerEntry {
id: worker_id,
restart_on_fork,
worker: pausable_worker,
});
Ok(self.registry.worker_handle(worker_id))
}

/// Pause and shut down every worker this runtime spawned; marks shutdown as triggered.
///
/// Does not affect host-runtime tasks. Worker errors are logged. Use either this or the
/// [`trigger_shutdown_signal`](Self::trigger_shutdown_signal) pair — not both.
pub async fn shutdown_async(&self) {
// Set triggered before draining so concurrent spawn_worker calls are rejected first.
// Poison-tolerant: this function doesn't fail.
{
let mut state = self
.shutdown_tracker
.state
.lock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are potentially locking a sync mutex in a tokio runtime, which might block the executor's thread. Is that ok in the current context (and if yes, why?)? I'm not sure to have the bigger picture. Are the other mutex-locking functions like spawn_worker and cos also expected to be run from within a runtime? I guess if yes, we can just use a tokio async mutex. If the mutex can be locked/unlocked from both within and outside the runtime, then I'm not sure...

.unwrap_or_else(|poison| poison.into_inner());
state.triggered = true;
}
self.registry.shutdown_async().await;
}

/// Mark shutdown as triggered and spawn fire-and-forget pause+shutdown tasks for every
/// registered worker, without blocking.
///
/// Pairs with [`wait_shutdown_done`](Self::wait_shutdown_done) for synchronous contexts
/// where `block_on` would deadlock.
///
/// # Errors
/// Returns [`SharedRuntimeError::LockFailed`] if a mutex was poisoned.
pub fn trigger_shutdown_signal(&self) -> Result<(), SharedRuntimeError> {
let handle = self.handle.clone();

// State → workers lock order: triggered is set before the registry is emptied,
// so a concurrent spawn_worker can't slip through.
let mut state = self.shutdown_tracker.state.lock().map_err(|e| {
SharedRuntimeError::LockFailed(format!("shutdown tracker state mutex poisoned: {e}"))
})?;
let workers = {
let mut workers_lock = self.registry.workers.lock().map_err(|e| {
SharedRuntimeError::LockFailed(format!("workers mutex poisoned: {e}"))
})?;
std::mem::take(&mut *workers_lock)
};
let count = workers.len();
state.triggered = true;
state.expected += count;
drop(state);

// Wake any waiter blocked in wait_shutdown_done before trigger was called.
if count == 0 {
self.shutdown_tracker.cv.notify_all();
return Ok(());
}

for mut entry in workers {
let tracker = self.shutdown_tracker.clone();
handle.spawn(async move {
if let Err(e) = entry.worker.pause().await {
error!("Worker failed to pause on shutdown trigger: {:?}", e);
} else {
entry.worker.shutdown().await;
}
// Poison-tolerant: panicking would skip the counter bump and block
// wait_shutdown_done.
let mut state = match tracker.state.lock() {
Ok(guard) => guard,
Err(poison) => {
error!("Shutdown tracker state mutex poisoned; counters may be inaccurate");
poison.into_inner()
}
};
state.completed += 1;
tracker.cv.notify_all();
});
}
Ok(())
}

/// Block the calling thread until all workers from
/// [`trigger_shutdown_signal`](Self::trigger_shutdown_signal) complete, or `timeout` elapses.
///
/// Idempotent. Must be called from a thread that is **not** an executor thread of the host
/// runtime — see the [module docs](self) for the full constraint. A current-thread host
/// runtime is detected and rejected; a single-worker multi-thread runtime is not detectable
/// and remains a caller responsibility.
///
/// # Errors
/// - [`SharedRuntimeError::WouldDeadlock`] if the host runtime is current-thread and there are
/// still outstanding shutdowns to wait on.
/// - [`SharedRuntimeError::ShutdownTimedOut`] on timeout.
pub fn wait_shutdown_done(&self, timeout: Duration) -> Result<(), SharedRuntimeError> {
// Poison-tolerant: counter state has no invariants a panicking holder could break.
let state = self
.shutdown_tracker
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());

// Fast path: nothing to wait on (no workers triggered, or all already completed).
if state.completed >= state.expected {
return Ok(());
}

// Tasks spawned by trigger_shutdown_signal run on `self.handle`. If that runtime is
// current-thread, parking this thread on the condvar starves them and we deadlock
// until `timeout` fires; reject the call instead.
if self.handle.runtime_flavor() == RuntimeFlavor::CurrentThread {
return Err(SharedRuntimeError::WouldDeadlock);
}

let (_state, res) = self
.shutdown_tracker
.cv
.wait_timeout_while(state, timeout, |s| s.completed < s.expected)
Comment thread
Aaalibaba42 marked this conversation as resolved.
Comment on lines +204 to +207
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid blocking the only Tokio worker during borrowed shutdown

When wait_shutdown_done is called from a task running on a new_multi_thread runtime configured with one worker thread, this blocking Condvar wait parks the only executor thread while the pause/shutdown tasks from trigger_shutdown_signal were spawned onto that same handle, so they cannot be polled and shutdown just times out. The current RuntimeFlavor::CurrentThread guard does not cover this valid multi-thread configuration; use block_in_place/an async wait path or reject calls from the host runtime when no other worker can drive the shutdown tasks.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RuntimeFlavor::CurrentThread doesn't cover worker_threads(1) multi-thread runtimes, and I can't detect that case programmatically for tokio doesn't expose worker count or distinguish executor threads from blocking threads through the Handle API...

I've changed the documentation to better reflect this as a caller responsibility.

Copy link
Copy Markdown
Contributor

@yannham yannham Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this kind of requirement is practical. If the borrowed runtime is, downstream, some random client's app tokio runtime, do we really have any control over how it is configured? I'm not sure how wait_shutdown_done is called, but if it's called in an async context, I suppose we could design an async wait operation instead (e.g., when polled, the future checks if s.completed < s.expected)?

.unwrap_or_else(|err| err.into_inner());
if res.timed_out() {
Err(SharedRuntimeError::ShutdownTimedOut(timeout))
} else {
Ok(())
}
}
}

impl Drop for BorrowedRuntime {
/// Fire-and-forget pause for any still-running workers; never blocks.
///
/// For a guaranteed drain, call [`trigger_shutdown_signal`](Self::trigger_shutdown_signal) +
/// [`wait_shutdown_done`](Self::wait_shutdown_done) before dropping.
fn drop(&mut self) {
let workers = {
let mut guard = match self.registry.workers.lock() {
Ok(g) => g,
Err(poison) => poison.into_inner(),
};
Comment on lines +224 to +227
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit

Suggested change
let mut guard = match self.registry.workers.lock() {
Ok(g) => g,
Err(poison) => poison.into_inner(),
};
let mut guard = self.registry.workers.lock().unwrap_or_else(|poison| poison.into_inner());

std::mem::take(&mut *guard)
};
if workers.is_empty() {
return;
}
let handle = self.handle.clone();
for mut entry in workers {
handle.spawn(async move {
if let Err(e) = entry.worker.pause().await {
debug!("Worker failed to pause during borrowed-mode Drop: {:?}", e);
}
});
}
}
}

/// Condvar-based completion tracker for [`BorrowedRuntime::trigger_shutdown_signal`].
#[derive(Debug, Default)]
struct ShutdownTracker {
state: Mutex<ShutdownState>,
cv: Condvar,
}

#[derive(Debug, Default)]
struct ShutdownState {
/// Total expected completions; bumped by `trigger_shutdown_signal`.
expected: usize,
/// Workers that have completed shutdown.
completed: usize,
/// Set by either shutdown path; blocks further `spawn_worker` calls.
triggered: bool,
}
Loading
Loading