From e0977a0be05969bfe89dc11794ad3ff5b63caac2 Mon Sep 17 00:00:00 2001 From: Jules Wiriath Date: Fri, 29 May 2026 16:00:33 +0200 Subject: [PATCH 1/3] feat: borrowed mode for the shared runtime chore: fmt --- .../src/shared_runtime.rs | 3 + libdd-shared-runtime/src/lib.rs | 15 +- .../src/shared_runtime/borrowed.rs | 234 +++++++++++++++++ .../src/shared_runtime/mod.rs | 248 ++++++++++++++---- 4 files changed, 445 insertions(+), 55 deletions(-) create mode 100644 libdd-shared-runtime/src/shared_runtime/borrowed.rs diff --git a/libdd-shared-runtime-ffi/src/shared_runtime.rs b/libdd-shared-runtime-ffi/src/shared_runtime.rs index 7a9a0ac084..a76e1a3b68 100644 --- a/libdd-shared-runtime-ffi/src/shared_runtime.rs +++ b/libdd-shared-runtime-ffi/src/shared_runtime.rs @@ -23,6 +23,8 @@ pub enum SharedRuntimeErrorCode { RuntimeCreation, /// Shutdown timed out. ShutdownTimedOut, + /// Borrowed mode: shutdown has been triggered, so new workers cannot be spawned. + RuntimeShuttingDown, /// An unexpected panic occurred inside the FFI call. #[cfg(feature = "catch_panic")] Panic, @@ -54,6 +56,7 @@ impl From for SharedRuntimeFFIError { SharedRuntimeError::WorkerError(_) => SharedRuntimeErrorCode::WorkerError, SharedRuntimeError::RuntimeCreation(_) => SharedRuntimeErrorCode::RuntimeCreation, SharedRuntimeError::ShutdownTimedOut(_) => SharedRuntimeErrorCode::ShutdownTimedOut, + SharedRuntimeError::RuntimeShuttingDown => SharedRuntimeErrorCode::RuntimeShuttingDown, }; SharedRuntimeFFIError::new(code, &err.to_string()) } diff --git a/libdd-shared-runtime/src/lib.rs b/libdd-shared-runtime/src/lib.rs index e9dc0ee642..7de7475521 100644 --- a/libdd-shared-runtime/src/lib.rs +++ b/libdd-shared-runtime/src/lib.rs @@ -8,17 +8,20 @@ //! A shared tokio runtime for running background workers across multiple components. //! -//! This crate provides [`SharedRuntime`], which owns a single tokio runtime and manages -//! [`PausableWorker`]s on it. Components such as the trace exporter can share one runtime -//! instead of each creating their own, reducing thread and resource overhead. +//! Two runtime modes manage background workers; capabilities are enforced at compile time: //! -//! [`SharedRuntime`] also provides fork-safety hooks (`before_fork`, `after_fork_parent`, -//! `after_fork_child`) that pause and restart workers around `fork()` calls, preventing -//! deadlocks in child processes. +//! - [`SharedRuntime`] (**owned mode**): owns a tokio runtime shared across components to reduce +//! thread overhead. Provides fork-safety hooks and synchronous `block_on`/`shutdown`. +//! - [`BorrowedRuntime`] (**borrowed mode**, native only): runs workers on a caller-owned runtime +//! via [`tokio::runtime::Handle`]. No fork hooks or synchronous `block_on`/`shutdown` — those +//! would touch a runtime we don't own. Workers it spawned can be stopped async or via a +//! `Condvar`-based path safe to call from inside the host runtime. pub mod shared_runtime; pub mod worker; // Top-level re-exports for convenience +#[cfg(not(target_arch = "wasm32"))] +pub use shared_runtime::BorrowedRuntime; pub use shared_runtime::{SharedRuntime, SharedRuntimeError, WorkerHandle, WorkerHandleError}; pub use worker::Worker; diff --git a/libdd-shared-runtime/src/shared_runtime/borrowed.rs b/libdd-shared-runtime/src/shared_runtime/borrowed.rs new file mode 100644 index 0000000000..fd31d77a19 --- /dev/null +++ b/libdd-shared-runtime/src/shared_runtime/borrowed.rs @@ -0,0 +1,234 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Borrowed-mode runtime. +//! +//! [`BorrowedRuntime`] runs workers on a caller-owned tokio runtime (via [`Handle`]), avoiding a +//! second tokio runtime in the process ("tokio-inside-tokio" panics). +//! +//! Fork hooks, synchronous `block_on`, and runtime shutdown are absent — those would touch a +//! runtime or tasks we don't own. Workers *we* spawned can still be stopped via +//! [`BorrowedRuntime::shutdown_async`] or the non-blocking +//! [`trigger_shutdown_signal`](BorrowedRuntime::trigger_shutdown_signal) + +//! [`wait_shutdown_done`](BorrowedRuntime::wait_shutdown_done) pair (safe from inside the host +//! runtime). +//! +//! Shutdown is terminal: once triggered, [`spawn_worker`](BorrowedRuntime::spawn_worker) returns +//! [`SharedRuntimeError::RuntimeShuttingDown`]; create a new instance to resume. + +use super::pausable_worker::{self, PausableWorker}; +use super::{BoxedWorker, SharedRuntimeError, WorkerEntry, WorkerHandle, WorkerRegistry}; +use crate::worker::Worker; +use libdd_common::MutexExt; +use std::sync::{Arc, Condvar, Mutex}; +use std::time::Duration; +use tokio::runtime::Handle; +use tracing::{debug, error}; + +/// A runtime that runs workers on a caller-owned tokio runtime via its [`Handle`]. +/// +/// See the [module docs](self) for the differences versus the owned +/// [`SharedRuntime`](super::SharedRuntime). +#[derive(Debug)] +pub struct BorrowedRuntime { + handle: Handle, + registry: WorkerRegistry, + shutdown_tracker: Arc, +} + +impl BorrowedRuntime { + /// Runs workers on `handle`'s runtime without taking ownership of it. + pub fn from_handle(handle: Handle) -> Self { + Self { + handle, + registry: WorkerRegistry::new(), + shutdown_tracker: Arc::new(ShutdownTracker::default()), + } + } + + /// Returns a clone of the caller-supplied tokio runtime handle. + pub fn runtime_handle(&self) -> Handle { + self.handle.clone() + } + + /// Spawn a [`Worker`] on the borrowed runtime. + /// + /// `restart_on_fork` is accepted for API symmetry but ignored: borrowed mode has no fork hooks. + /// + /// # Errors + /// Returns [`SharedRuntimeError::RuntimeShuttingDown`] after shutdown; other errors if the + /// worker fails to start. + pub fn spawn_worker( + &self, + worker: T, + restart_on_fork: bool, + ) -> Result { + // Lock order: state → workers (matches both shutdown paths), so checking `triggered` + // under state then acquiring workers is race-free with concurrent triggers. + let state = self.shutdown_tracker.state.lock().map_err(|e| { + SharedRuntimeError::LockFailed(format!("shutdown tracker state mutex poisoned: {e}")) + })?; + if state.triggered { + return Err(SharedRuntimeError::RuntimeShuttingDown); + } + let mut workers_guard = self.registry.workers.lock_or_panic(); + drop(state); // safe: a concurrent trigger still needs workers lock we hold + + let boxed_worker: BoxedWorker = Box::new(worker); + debug!(?boxed_worker, "Spawning worker on BorrowedRuntime"); + let mut pausable_worker = PausableWorker::new(boxed_worker); + pausable_worker.start(pausable_worker::tokio_spawn_fn(&self.handle))?; + let worker_id = self.registry.next_id(); + workers_guard.push(WorkerEntry { + id: worker_id, + restart_on_fork, + worker: pausable_worker, + }); + Ok(self.registry.worker_handle(worker_id)) + } + + /// Pause and shut down every worker this runtime spawned; marks shutdown as triggered. + /// + /// Does not affect host-runtime tasks. Worker errors are logged. Use either this or the + /// [`trigger_shutdown_signal`](Self::trigger_shutdown_signal) pair — not both. + pub async fn shutdown_async(&self) { + // Set triggered before draining so concurrent spawn_worker calls are rejected first. + // Poison-tolerant: this function doesn't fail. + { + let mut state = self + .shutdown_tracker + .state + .lock() + .unwrap_or_else(|poison| poison.into_inner()); + state.triggered = true; + } + self.registry.shutdown_async().await; + } + + /// Mark shutdown as triggered and spawn fire-and-forget pause+shutdown tasks for every + /// registered worker, without blocking. + /// + /// Pairs with [`wait_shutdown_done`](Self::wait_shutdown_done) for synchronous contexts + /// where `block_on` would deadlock. + /// + /// # Errors + /// Returns [`SharedRuntimeError::LockFailed`] if a mutex was poisoned. + pub fn trigger_shutdown_signal(&self) -> Result<(), SharedRuntimeError> { + let handle = self.handle.clone(); + + // State → workers lock order: triggered is set before the registry is emptied, + // so a concurrent spawn_worker can't slip through. + let mut state = self.shutdown_tracker.state.lock().map_err(|e| { + SharedRuntimeError::LockFailed(format!("shutdown tracker state mutex poisoned: {e}")) + })?; + let workers = { + let mut workers_lock = self.registry.workers.lock().map_err(|e| { + SharedRuntimeError::LockFailed(format!("workers mutex poisoned: {e}")) + })?; + std::mem::take(&mut *workers_lock) + }; + let count = workers.len(); + state.triggered = true; + state.expected += count; + drop(state); + + // Wake any waiter blocked in wait_shutdown_done before trigger was called. + if count == 0 { + self.shutdown_tracker.cv.notify_all(); + return Ok(()); + } + + for mut entry in workers { + let tracker = self.shutdown_tracker.clone(); + handle.spawn(async move { + if let Err(e) = entry.worker.pause().await { + error!("Worker failed to pause on shutdown trigger: {:?}", e); + } else { + entry.worker.shutdown().await; + } + // Poison-tolerant: panicking would skip the counter bump and block + // wait_shutdown_done. + let mut state = match tracker.state.lock() { + Ok(guard) => guard, + Err(poison) => { + error!("Shutdown tracker state mutex poisoned; counters may be inaccurate"); + poison.into_inner() + } + }; + state.completed += 1; + tracker.cv.notify_all(); + }); + } + Ok(()) + } + + /// Block the calling thread until all workers from + /// [`trigger_shutdown_signal`](Self::trigger_shutdown_signal) complete, or `timeout` elapses. + /// + /// Uses a [`Condvar`], so safe to call from inside the host tokio runtime. Idempotent. + /// + /// # Errors + /// Returns [`SharedRuntimeError::ShutdownTimedOut`] on timeout. + pub fn wait_shutdown_done(&self, timeout: Duration) -> Result<(), SharedRuntimeError> { + // Poison-tolerant: counter state has no invariants a panicking holder could break. + let state = self + .shutdown_tracker + .state + .lock() + .unwrap_or_else(|poison| poison.into_inner()); + let (_state, res) = self + .shutdown_tracker + .cv + .wait_timeout_while(state, timeout, |s| s.completed < s.expected) + .unwrap_or_else(|err| err.into_inner()); + if res.timed_out() { + Err(SharedRuntimeError::ShutdownTimedOut(timeout)) + } else { + Ok(()) + } + } +} + +impl Drop for BorrowedRuntime { + /// Fire-and-forget pause for any still-running workers; never blocks. + /// + /// For a guaranteed drain, call [`trigger_shutdown_signal`](Self::trigger_shutdown_signal) + + /// [`wait_shutdown_done`](Self::wait_shutdown_done) before dropping. + fn drop(&mut self) { + let workers = { + let mut guard = match self.registry.workers.lock() { + Ok(g) => g, + Err(poison) => poison.into_inner(), + }; + std::mem::take(&mut *guard) + }; + if workers.is_empty() { + return; + } + let handle = self.handle.clone(); + for mut entry in workers { + handle.spawn(async move { + if let Err(e) = entry.worker.pause().await { + debug!("Worker failed to pause during borrowed-mode Drop: {:?}", e); + } + }); + } + } +} + +/// Condvar-based completion tracker for [`BorrowedRuntime::trigger_shutdown_signal`]. +#[derive(Debug, Default)] +struct ShutdownTracker { + state: Mutex, + cv: Condvar, +} + +#[derive(Debug, Default)] +struct ShutdownState { + /// Total expected completions; bumped by `trigger_shutdown_signal`. + expected: usize, + /// Workers that have completed shutdown. + completed: usize, + /// Set by either shutdown path; blocks further `spawn_worker` calls. + triggered: bool, +} diff --git a/libdd-shared-runtime/src/shared_runtime/mod.rs b/libdd-shared-runtime/src/shared_runtime/mod.rs index 058bf730a5..ff6b684c9c 100644 --- a/libdd-shared-runtime/src/shared_runtime/mod.rs +++ b/libdd-shared-runtime/src/shared_runtime/mod.rs @@ -10,11 +10,16 @@ pub(crate) mod pausable_worker; +#[cfg(not(target_arch = "wasm32"))] +mod borrowed; +#[cfg(not(target_arch = "wasm32"))] +pub use borrowed::BorrowedRuntime; + use crate::worker::Worker; use futures::stream::{FuturesUnordered, StreamExt}; use libdd_common::MutexExt; use pausable_worker::{PausableWorker, PausableWorkerError}; -use std::sync::atomic::AtomicU64; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::{fmt, io}; use tracing::{debug, error}; @@ -26,7 +31,6 @@ use tracing::{debug, error}; mod native { use super::*; use pausable_worker::tokio_spawn_fn; - use std::sync::atomic::Ordering; use tokio::runtime::{Builder, Runtime}; fn build_runtime() -> Result { @@ -40,8 +44,7 @@ mod native { pub(in super::super) fn new_native() -> Result { Ok(Self { runtime: Arc::new(Mutex::new(Some(Arc::new(build_runtime()?)))), - workers: Arc::new(Mutex::new(Vec::new())), - next_worker_id: AtomicU64::new(1), + registry: WorkerRegistry::new(), }) } @@ -87,7 +90,7 @@ mod native { // starting; after_fork_parent/child will start the worker on the // new runtime. let runtime_guard = self.runtime.lock_or_panic(); - let mut workers_guard = self.workers.lock_or_panic(); + let mut workers_guard = self.registry.workers.lock_or_panic(); if let Some(rt) = runtime_guard.as_ref() { if let Err(e) = pausable_worker.start(tokio_spawn_fn(rt.handle())) { @@ -95,7 +98,7 @@ mod native { } } - let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed); + let worker_id = self.registry.next_id(); workers_guard.push(WorkerEntry { id: worker_id, @@ -103,10 +106,7 @@ mod native { worker: pausable_worker, }); - Ok(WorkerHandle { - worker_id, - workers: self.workers.clone(), - }) + Ok(self.registry.worker_handle(worker_id)) } /// Hook to be called before forking. @@ -120,7 +120,7 @@ mod native { pub fn before_fork(&self) { debug!("before_fork: pausing all workers"); if let Some(runtime) = self.runtime.lock_or_panic().take() { - let mut workers_lock = self.workers.lock_or_panic(); + let mut workers_lock = self.registry.workers.lock_or_panic(); runtime.block_on(async { let futures: FuturesUnordered<_> = workers_lock .iter_mut() @@ -163,7 +163,7 @@ mod native { .clone(); drop(runtime_lock); - let mut workers_lock = self.workers.lock_or_panic(); + let mut workers_lock = self.registry.workers.lock_or_panic(); for worker_entry in workers_lock.iter_mut() { worker_entry.worker.start(tokio_spawn_fn(&handle))?; @@ -192,7 +192,7 @@ mod native { .clone(); drop(runtime_lock); - let mut workers_lock = self.workers.lock_or_panic(); + let mut workers_lock = self.registry.workers.lock_or_panic(); workers_lock.retain(|entry| entry.restart_on_fork); @@ -256,6 +256,60 @@ mod native { type BoxedWorker = Box; +/// Shared bookkeeping for the set of [`PausableWorker`]s managed by a runtime. +/// +/// Both [`SharedRuntime`] and [`BorrowedRuntime`] embed a `WorkerRegistry` so the +/// worker-tracking, id allocation, and async shutdown logic is written once. +#[derive(Debug)] +struct WorkerRegistry { + workers: Arc>>, + next_worker_id: AtomicU64, +} + +impl WorkerRegistry { + fn new() -> Self { + Self { + workers: Arc::new(Mutex::new(Vec::new())), + next_worker_id: AtomicU64::new(1), + } + } + + fn next_id(&self) -> u64 { + self.next_worker_id.fetch_add(1, Ordering::Relaxed) + } + + fn worker_handle(&self, worker_id: u64) -> WorkerHandle { + WorkerHandle { + worker_id, + workers: self.workers.clone(), + } + } + + /// Pause then shut down every registered worker, draining the registry. + /// + /// Worker errors are logged but do not cause the function to fail. + async fn shutdown_async(&self) { + debug!("Shutting down all workers asynchronously"); + let workers = { + let mut workers_lock = self.workers.lock_or_panic(); + std::mem::take(&mut *workers_lock) + }; + + let futures: FuturesUnordered<_> = workers + .into_iter() + .map(|mut worker_entry| async move { + if let Err(e) = worker_entry.worker.pause().await { + error!("Worker failed to shutdown: {:?}", e); + return; + } + worker_entry.worker.shutdown().await; + }) + .collect(); + + futures.collect::<()>().await; + } +} + #[derive(Debug)] struct WorkerEntry { id: u64, @@ -345,6 +399,8 @@ pub enum SharedRuntimeError { RuntimeCreation(io::Error), /// Shutdown timed out. ShutdownTimedOut(std::time::Duration), + /// Borrowed mode: shutdown has been triggered, so new workers cannot be spawned. + RuntimeShuttingDown, } impl fmt::Display for SharedRuntimeError { @@ -361,6 +417,9 @@ impl fmt::Display for SharedRuntimeError { Self::ShutdownTimedOut(duration) => { write!(f, "Shutdown timed out after {:?}", duration) } + Self::RuntimeShuttingDown => { + write!(f, "Runtime is shutting down; no new workers can be spawned") + } } } } @@ -389,14 +448,13 @@ impl From for SharedRuntimeError { /// on the JS event loop. /// /// # Mutex lock order -/// When locking both [Self::runtime] and [Self::workers], the mutex must be locked in the order of -/// the fields in the struct. When possible avoid holding both locks simultaneously. +/// When locking both [Self::runtime] and the registry's worker mutex, the runtime mutex must +/// be locked first. When possible avoid holding both locks simultaneously. #[derive(Debug)] pub struct SharedRuntime { #[cfg(not(target_arch = "wasm32"))] runtime: Arc>>>, - workers: Arc>>, - next_worker_id: AtomicU64, + registry: WorkerRegistry, } impl SharedRuntime { @@ -417,8 +475,7 @@ impl SharedRuntime { #[cfg(target_arch = "wasm32")] { Ok(Self { - workers: Arc::new(Mutex::new(Vec::new())), - next_worker_id: AtomicU64::new(1), + registry: WorkerRegistry::new(), }) } } @@ -430,13 +487,11 @@ impl SharedRuntime { worker: T, restart_on_fork: bool, ) -> Result { - use std::sync::atomic::Ordering; - let boxed_worker: BoxedWorker = Box::new(worker); debug!(?boxed_worker, "Spawning worker on SharedRuntime"); let mut pausable_worker = PausableWorker::new(boxed_worker); - let mut workers_guard = self.workers.lock_or_panic(); + let mut workers_guard = self.registry.workers.lock_or_panic(); if let Err(e) = pausable_worker.start(|future| { use futures_util::FutureExt; @@ -447,7 +502,7 @@ impl SharedRuntime { return Err(e.into()); } - let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed); + let worker_id = self.registry.next_id(); workers_guard.push(WorkerEntry { id: worker_id, @@ -455,10 +510,7 @@ impl SharedRuntime { worker: pausable_worker, }); - Ok(WorkerHandle { - worker_id, - workers: self.workers.clone(), - }) + Ok(self.registry.worker_handle(worker_id)) } /// Shutdown all workers asynchronously. @@ -468,24 +520,7 @@ impl SharedRuntime { /// /// Worker errors are logged but do not cause the function to fail. pub async fn shutdown_async(&self) { - debug!("Shutting down all workers asynchronously"); - let workers = { - let mut workers_lock = self.workers.lock_or_panic(); - std::mem::take(&mut *workers_lock) - }; - - let futures: FuturesUnordered<_> = workers - .into_iter() - .map(|mut worker_entry| async move { - if let Err(e) = worker_entry.worker.pause().await { - error!("Worker failed to shutdown: {:?}", e); - return; - } - worker_entry.worker.shutdown().await; - }) - .collect(); - - futures.collect::<()>().await; + self.registry.shutdown_async().await; } } @@ -542,7 +577,7 @@ mod tests { let result = shared_runtime.spawn_worker(worker, true); assert!(result.is_ok()); - assert_eq!(shared_runtime.workers.lock_or_panic().len(), 1); + assert_eq!(shared_runtime.registry.workers.lock_or_panic().len(), 1); // Verify the worker is actually running by receiving its first output assert_eq!( @@ -560,7 +595,7 @@ mod tests { let (worker, receiver) = make_test_worker(); let handle = shared_runtime.spawn_worker(worker, true).unwrap(); - assert_eq!(shared_runtime.workers.lock_or_panic().len(), 1); + assert_eq!(shared_runtime.registry.workers.lock_or_panic().len(), 1); // Wait for at least one run before stopping receiver @@ -571,7 +606,7 @@ mod tests { assert!(handle.stop().await.is_ok()); }); - assert_eq!(shared_runtime.workers.lock_or_panic().len(), 0); + assert_eq!(shared_runtime.registry.workers.lock_or_panic().len(), 0); // Drain all messages after stop — the last one must be the shutdown sentinel let mut last = receiver @@ -688,7 +723,7 @@ mod tests { assert!(shared_runtime.after_fork_child().is_ok()); // Worker must be removed from the list - assert_eq!(shared_runtime.workers.lock_or_panic().len(), 0); + assert_eq!(shared_runtime.registry.workers.lock_or_panic().len(), 0); // Worker must not produce any more messages (not restarted, not shut down) assert!( @@ -696,4 +731,119 @@ mod tests { "worker should not run or shut down after fork in child when restart_on_fork is false" ); } + + /// Borrowed mode: a worker spawned on a host runtime's handle runs on that host runtime. + #[test] + fn test_borrowed_runtime_runs_worker() { + let host = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .unwrap(); + let rt = BorrowedRuntime::from_handle(host.handle().clone()); + + let (worker, receiver) = make_test_worker(); + let _handle = rt.spawn_worker(worker, true).unwrap(); + + assert_eq!( + receiver + .recv_timeout(Duration::from_secs(1)) + .expect("worker did not run on host runtime"), + 0 + ); + } + + /// Borrowed mode: workers can be shut down via the Condvar path without `block_on`. + #[test] + fn test_borrowed_trigger_and_wait_shutdown() { + let host = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .unwrap(); + let rt = BorrowedRuntime::from_handle(host.handle().clone()); + + let (worker, receiver) = make_test_worker(); + let _ = rt.spawn_worker(worker, true).unwrap(); + + // Wait for the worker to advance at least once on the host runtime. + receiver + .recv_timeout(Duration::from_secs(1)) + .expect("worker did not run on host runtime"); + + rt.trigger_shutdown_signal() + .expect("trigger_shutdown_signal failed"); + rt.wait_shutdown_done(Duration::from_secs(5)) + .expect("shutdown did not complete in time"); + + // Drain remaining messages — the last one must be the shutdown sentinel (-1). + let mut last = receiver + .recv_timeout(Duration::from_secs(1)) + .expect("shutdown sentinel was not produced"); + while let Ok(v) = receiver.try_recv() { + last = v; + } + assert_eq!(last, -1); + } + + /// `wait_shutdown_done` returns immediately when no workers were ever registered. + #[test] + fn test_borrowed_wait_shutdown_done_no_workers() { + let host = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let rt = BorrowedRuntime::from_handle(host.handle().clone()); + + rt.trigger_shutdown_signal().unwrap(); + rt.wait_shutdown_done(Duration::from_secs(1)) + .expect("wait_shutdown_done should return immediately with no workers"); + } + + /// After either shutdown path runs, `spawn_worker` must reject further workers. + #[test] + fn test_borrowed_spawn_after_shutdown_rejected() { + let host = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .unwrap(); + + // After trigger_shutdown_signal. + let rt = BorrowedRuntime::from_handle(host.handle().clone()); + rt.trigger_shutdown_signal().unwrap(); + let (worker, _) = make_test_worker(); + assert!(matches!( + rt.spawn_worker(worker, true), + Err(SharedRuntimeError::RuntimeShuttingDown) + )); + + // After shutdown_async. + let rt = BorrowedRuntime::from_handle(host.handle().clone()); + host.block_on(rt.shutdown_async()); + let (worker, _) = make_test_worker(); + assert!(matches!( + rt.spawn_worker(worker, true), + Err(SharedRuntimeError::RuntimeShuttingDown) + )); + } + + /// Dropping a `BorrowedRuntime` from inside the host tokio runtime must not panic + /// (the Drop impl is fire-and-forget and never calls `block_on`). + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_borrowed_drop_inside_tokio_runtime() { + let rt = BorrowedRuntime::from_handle(tokio::runtime::Handle::current()); + + let (worker, _receiver) = make_test_worker(); + let _ = rt.spawn_worker(worker, true).unwrap(); + + // Give the worker a moment to start on the host runtime. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Dropping from inside the host runtime must not panic. + drop(rt); + + // Let the fire-and-forget pause task run. + tokio::time::sleep(Duration::from_millis(50)).await; + } } From 4e49301fa35772c334a13d51fe6f6606f9100209 Mon Sep 17 00:00:00 2001 From: Jules Wiriath Date: Fri, 29 May 2026 16:18:10 +0200 Subject: [PATCH 2/3] fix: introduce WouldDeadLock error type --- .../src/shared_runtime.rs | 4 ++ .../src/shared_runtime/borrowed.rs | 24 ++++++++++-- .../src/shared_runtime/mod.rs | 37 +++++++++++++++++++ 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/libdd-shared-runtime-ffi/src/shared_runtime.rs b/libdd-shared-runtime-ffi/src/shared_runtime.rs index a76e1a3b68..bc8c42a74e 100644 --- a/libdd-shared-runtime-ffi/src/shared_runtime.rs +++ b/libdd-shared-runtime-ffi/src/shared_runtime.rs @@ -25,6 +25,9 @@ pub enum SharedRuntimeErrorCode { ShutdownTimedOut, /// Borrowed mode: shutdown has been triggered, so new workers cannot be spawned. RuntimeShuttingDown, + /// Borrowed mode: the operation would deadlock against the host runtime + /// (e.g. waiting on shutdown from inside a current-thread runtime). + WouldDeadlock, /// An unexpected panic occurred inside the FFI call. #[cfg(feature = "catch_panic")] Panic, @@ -57,6 +60,7 @@ impl From for SharedRuntimeFFIError { SharedRuntimeError::RuntimeCreation(_) => SharedRuntimeErrorCode::RuntimeCreation, SharedRuntimeError::ShutdownTimedOut(_) => SharedRuntimeErrorCode::ShutdownTimedOut, SharedRuntimeError::RuntimeShuttingDown => SharedRuntimeErrorCode::RuntimeShuttingDown, + SharedRuntimeError::WouldDeadlock => SharedRuntimeErrorCode::WouldDeadlock, }; SharedRuntimeFFIError::new(code, &err.to_string()) } diff --git a/libdd-shared-runtime/src/shared_runtime/borrowed.rs b/libdd-shared-runtime/src/shared_runtime/borrowed.rs index fd31d77a19..b1d6ee74ad 100644 --- a/libdd-shared-runtime/src/shared_runtime/borrowed.rs +++ b/libdd-shared-runtime/src/shared_runtime/borrowed.rs @@ -22,7 +22,7 @@ use crate::worker::Worker; use libdd_common::MutexExt; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; -use tokio::runtime::Handle; +use tokio::runtime::{Handle, RuntimeFlavor}; use tracing::{debug, error}; /// A runtime that runs workers on a caller-owned tokio runtime via its [`Handle`]. @@ -165,10 +165,15 @@ impl BorrowedRuntime { /// Block the calling thread until all workers from /// [`trigger_shutdown_signal`](Self::trigger_shutdown_signal) complete, or `timeout` elapses. /// - /// Uses a [`Condvar`], so safe to call from inside the host tokio runtime. Idempotent. + /// Idempotent. Safe to call from inside the host runtime only when it is multi-threaded + /// with at least one other worker free to drive the pending shutdown tasks. Calling from + /// a current-thread runtime would block the only executor thread and starve those tasks, + /// so this is rejected up front with [`SharedRuntimeError::WouldDeadlock`]. /// /// # Errors - /// Returns [`SharedRuntimeError::ShutdownTimedOut`] on timeout. + /// - [`SharedRuntimeError::WouldDeadlock`] if the host runtime is current-thread and there are + /// still outstanding shutdowns to wait on. + /// - [`SharedRuntimeError::ShutdownTimedOut`] on timeout. pub fn wait_shutdown_done(&self, timeout: Duration) -> Result<(), SharedRuntimeError> { // Poison-tolerant: counter state has no invariants a panicking holder could break. let state = self @@ -176,6 +181,19 @@ impl BorrowedRuntime { .state .lock() .unwrap_or_else(|poison| poison.into_inner()); + + // Fast path: nothing to wait on (no workers triggered, or all already completed). + if state.completed >= state.expected { + return Ok(()); + } + + // Tasks spawned by trigger_shutdown_signal run on `self.handle`. If that runtime is + // current-thread, parking this thread on the condvar starves them and we deadlock + // until `timeout` fires; reject the call instead. + if self.handle.runtime_flavor() == RuntimeFlavor::CurrentThread { + return Err(SharedRuntimeError::WouldDeadlock); + } + let (_state, res) = self .shutdown_tracker .cv diff --git a/libdd-shared-runtime/src/shared_runtime/mod.rs b/libdd-shared-runtime/src/shared_runtime/mod.rs index ff6b684c9c..2d17d18cec 100644 --- a/libdd-shared-runtime/src/shared_runtime/mod.rs +++ b/libdd-shared-runtime/src/shared_runtime/mod.rs @@ -401,6 +401,10 @@ pub enum SharedRuntimeError { ShutdownTimedOut(std::time::Duration), /// Borrowed mode: shutdown has been triggered, so new workers cannot be spawned. RuntimeShuttingDown, + /// Borrowed mode: the operation would deadlock against the host runtime + /// (e.g. blocking the only executor thread of a current-thread runtime + /// while waiting on tasks spawned onto the same runtime). + WouldDeadlock, } impl fmt::Display for SharedRuntimeError { @@ -420,6 +424,13 @@ impl fmt::Display for SharedRuntimeError { Self::RuntimeShuttingDown => { write!(f, "Runtime is shutting down; no new workers can be spawned") } + Self::WouldDeadlock => { + write!( + f, + "Operation would deadlock against the host runtime; call from a thread \ + not owned by the host runtime, or use a multi-threaded runtime" + ) + } } } } @@ -800,6 +811,32 @@ mod tests { .expect("wait_shutdown_done should return immediately with no workers"); } + /// On a current-thread host runtime, `wait_shutdown_done` would park the only executor + /// thread and starve the spawned shutdown tasks. It must reject up front instead of + /// timing out. + #[test] + fn test_borrowed_wait_shutdown_done_current_thread_rejected() { + let host = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let rt = BorrowedRuntime::from_handle(host.handle().clone()); + + let (worker, _receiver) = make_test_worker(); + let _ = rt.spawn_worker(worker, true).unwrap(); + + rt.trigger_shutdown_signal() + .expect("trigger_shutdown_signal failed"); + + let start = std::time::Instant::now(); + let err = rt + .wait_shutdown_done(Duration::from_secs(5)) + .expect_err("wait_shutdown_done must reject on a current-thread host"); + assert!(matches!(err, SharedRuntimeError::WouldDeadlock)); + // The check must be a fast rejection, not a timeout-wait. + assert!(start.elapsed() < Duration::from_secs(1)); + } + /// After either shutdown path runs, `spawn_worker` must reject further workers. #[test] fn test_borrowed_spawn_after_shutdown_rejected() { From df92857a987686cf7b4a0780750f4349fbdd03ba Mon Sep 17 00:00:00 2001 From: Jules Wiriath Date: Fri, 29 May 2026 16:43:45 +0200 Subject: [PATCH 3/3] doc: fix documentation saying it's safe to a caller responsibility to not call this on executor thread --- .../src/shared_runtime/borrowed.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/libdd-shared-runtime/src/shared_runtime/borrowed.rs b/libdd-shared-runtime/src/shared_runtime/borrowed.rs index b1d6ee74ad..4e557c5b76 100644 --- a/libdd-shared-runtime/src/shared_runtime/borrowed.rs +++ b/libdd-shared-runtime/src/shared_runtime/borrowed.rs @@ -10,8 +10,15 @@ //! runtime or tasks we don't own. Workers *we* spawned can still be stopped via //! [`BorrowedRuntime::shutdown_async`] or the non-blocking //! [`trigger_shutdown_signal`](BorrowedRuntime::trigger_shutdown_signal) + -//! [`wait_shutdown_done`](BorrowedRuntime::wait_shutdown_done) pair (safe from inside the host -//! runtime). +//! [`wait_shutdown_done`](BorrowedRuntime::wait_shutdown_done) pair. +//! +//! [`wait_shutdown_done`](BorrowedRuntime::wait_shutdown_done) must be called from a thread that +//! is **not** an executor thread of the host runtime: it parks the calling thread on a condvar +//! while the shutdown tasks run on the host runtime's executor threads. Calling it from an +//! executor thread starves those tasks. A current-thread runtime is detected and rejected +//! eagerly; a multi-thread runtime configured with a single worker thread produces the same +//! deadlock but cannot be detected through the tokio `Handle` API — that case is a caller +//! responsibility. //! //! Shutdown is terminal: once triggered, [`spawn_worker`](BorrowedRuntime::spawn_worker) returns //! [`SharedRuntimeError::RuntimeShuttingDown`]; create a new instance to resume. @@ -165,10 +172,10 @@ impl BorrowedRuntime { /// Block the calling thread until all workers from /// [`trigger_shutdown_signal`](Self::trigger_shutdown_signal) complete, or `timeout` elapses. /// - /// Idempotent. Safe to call from inside the host runtime only when it is multi-threaded - /// with at least one other worker free to drive the pending shutdown tasks. Calling from - /// a current-thread runtime would block the only executor thread and starve those tasks, - /// so this is rejected up front with [`SharedRuntimeError::WouldDeadlock`]. + /// Idempotent. Must be called from a thread that is **not** an executor thread of the host + /// runtime — see the [module docs](self) for the full constraint. A current-thread host + /// runtime is detected and rejected; a single-worker multi-thread runtime is not detectable + /// and remains a caller responsibility. /// /// # Errors /// - [`SharedRuntimeError::WouldDeadlock`] if the host runtime is current-thread and there are