-
Notifications
You must be signed in to change notification settings - Fork 20
feat(shared-runtime)!: add BorrowedRuntime for caller-owned tokio runtimes #2061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,259 @@ | ||||||||||||
| // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ | ||||||||||||
| // SPDX-License-Identifier: Apache-2.0 | ||||||||||||
|
|
||||||||||||
| //! Borrowed-mode runtime. | ||||||||||||
| //! | ||||||||||||
| //! [`BorrowedRuntime`] runs workers on a caller-owned tokio runtime (via [`Handle`]), avoiding a | ||||||||||||
| //! second tokio runtime in the process ("tokio-inside-tokio" panics). | ||||||||||||
| //! | ||||||||||||
| //! Fork hooks, synchronous `block_on`, and runtime shutdown are absent — those would touch a | ||||||||||||
| //! runtime or tasks we don't own. Workers *we* spawned can still be stopped via | ||||||||||||
| //! [`BorrowedRuntime::shutdown_async`] or the non-blocking | ||||||||||||
| //! [`trigger_shutdown_signal`](BorrowedRuntime::trigger_shutdown_signal) + | ||||||||||||
| //! [`wait_shutdown_done`](BorrowedRuntime::wait_shutdown_done) pair. | ||||||||||||
| //! | ||||||||||||
| //! [`wait_shutdown_done`](BorrowedRuntime::wait_shutdown_done) must be called from a thread that | ||||||||||||
| //! is **not** an executor thread of the host runtime: it parks the calling thread on a condvar | ||||||||||||
| //! while the shutdown tasks run on the host runtime's executor threads. Calling it from an | ||||||||||||
| //! executor thread starves those tasks. A current-thread runtime is detected and rejected | ||||||||||||
| //! eagerly; a multi-thread runtime configured with a single worker thread produces the same | ||||||||||||
| //! deadlock but cannot be detected through the tokio `Handle` API — that case is a caller | ||||||||||||
| //! responsibility. | ||||||||||||
| //! | ||||||||||||
| //! Shutdown is terminal: once triggered, [`spawn_worker`](BorrowedRuntime::spawn_worker) returns | ||||||||||||
| //! [`SharedRuntimeError::RuntimeShuttingDown`]; create a new instance to resume. | ||||||||||||
|
|
||||||||||||
| use super::pausable_worker::{self, PausableWorker}; | ||||||||||||
| use super::{BoxedWorker, SharedRuntimeError, WorkerEntry, WorkerHandle, WorkerRegistry}; | ||||||||||||
| use crate::worker::Worker; | ||||||||||||
| use libdd_common::MutexExt; | ||||||||||||
| use std::sync::{Arc, Condvar, Mutex}; | ||||||||||||
| use std::time::Duration; | ||||||||||||
| use tokio::runtime::{Handle, RuntimeFlavor}; | ||||||||||||
| use tracing::{debug, error}; | ||||||||||||
|
|
||||||||||||
| /// A runtime that runs workers on a caller-owned tokio runtime via its [`Handle`]. | ||||||||||||
| /// | ||||||||||||
| /// See the [module docs](self) for the differences versus the owned | ||||||||||||
| /// [`SharedRuntime`](super::SharedRuntime). | ||||||||||||
| #[derive(Debug)] | ||||||||||||
| pub struct BorrowedRuntime { | ||||||||||||
| handle: Handle, | ||||||||||||
| registry: WorkerRegistry, | ||||||||||||
| shutdown_tracker: Arc<ShutdownTracker>, | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| impl BorrowedRuntime { | ||||||||||||
| /// Runs workers on `handle`'s runtime without taking ownership of it. | ||||||||||||
| pub fn from_handle(handle: Handle) -> Self { | ||||||||||||
| Self { | ||||||||||||
| handle, | ||||||||||||
| registry: WorkerRegistry::new(), | ||||||||||||
| shutdown_tracker: Arc::new(ShutdownTracker::default()), | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /// Returns a clone of the caller-supplied tokio runtime handle. | ||||||||||||
| pub fn runtime_handle(&self) -> Handle { | ||||||||||||
| self.handle.clone() | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /// Spawn a [`Worker`] on the borrowed runtime. | ||||||||||||
| /// | ||||||||||||
| /// `restart_on_fork` is accepted for API symmetry but ignored: borrowed mode has no fork hooks. | ||||||||||||
| /// | ||||||||||||
| /// # Errors | ||||||||||||
| /// Returns [`SharedRuntimeError::RuntimeShuttingDown`] after shutdown; other errors if the | ||||||||||||
| /// worker fails to start. | ||||||||||||
| pub fn spawn_worker<T: Worker + Sync + 'static>( | ||||||||||||
| &self, | ||||||||||||
| worker: T, | ||||||||||||
| restart_on_fork: bool, | ||||||||||||
| ) -> Result<WorkerHandle, SharedRuntimeError> { | ||||||||||||
| // Lock order: state → workers (matches both shutdown paths), so checking `triggered` | ||||||||||||
| // under state then acquiring workers is race-free with concurrent triggers. | ||||||||||||
| let state = self.shutdown_tracker.state.lock().map_err(|e| { | ||||||||||||
| SharedRuntimeError::LockFailed(format!("shutdown tracker state mutex poisoned: {e}")) | ||||||||||||
| })?; | ||||||||||||
| if state.triggered { | ||||||||||||
| return Err(SharedRuntimeError::RuntimeShuttingDown); | ||||||||||||
| } | ||||||||||||
| let mut workers_guard = self.registry.workers.lock_or_panic(); | ||||||||||||
| drop(state); // safe: a concurrent trigger still needs workers lock we hold | ||||||||||||
|
|
||||||||||||
| let boxed_worker: BoxedWorker = Box::new(worker); | ||||||||||||
| debug!(?boxed_worker, "Spawning worker on BorrowedRuntime"); | ||||||||||||
| let mut pausable_worker = PausableWorker::new(boxed_worker); | ||||||||||||
| pausable_worker.start(pausable_worker::tokio_spawn_fn(&self.handle))?; | ||||||||||||
| let worker_id = self.registry.next_id(); | ||||||||||||
| workers_guard.push(WorkerEntry { | ||||||||||||
| id: worker_id, | ||||||||||||
| restart_on_fork, | ||||||||||||
| worker: pausable_worker, | ||||||||||||
| }); | ||||||||||||
| Ok(self.registry.worker_handle(worker_id)) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /// Pause and shut down every worker this runtime spawned; marks shutdown as triggered. | ||||||||||||
| /// | ||||||||||||
| /// Does not affect host-runtime tasks. Worker errors are logged. Use either this or the | ||||||||||||
| /// [`trigger_shutdown_signal`](Self::trigger_shutdown_signal) pair — not both. | ||||||||||||
| pub async fn shutdown_async(&self) { | ||||||||||||
| // Set triggered before draining so concurrent spawn_worker calls are rejected first. | ||||||||||||
| // Poison-tolerant: this function doesn't fail. | ||||||||||||
| { | ||||||||||||
| let mut state = self | ||||||||||||
| .shutdown_tracker | ||||||||||||
| .state | ||||||||||||
| .lock() | ||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are potentially locking a sync mutex in a tokio runtime, which might block the executor's thread. Is that ok in the current context (and if yes, why?)? I'm not sure to have the bigger picture. Are the other mutex-locking functions like |
||||||||||||
| .unwrap_or_else(|poison| poison.into_inner()); | ||||||||||||
| state.triggered = true; | ||||||||||||
| } | ||||||||||||
| self.registry.shutdown_async().await; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /// Mark shutdown as triggered and spawn fire-and-forget pause+shutdown tasks for every | ||||||||||||
| /// registered worker, without blocking. | ||||||||||||
| /// | ||||||||||||
| /// Pairs with [`wait_shutdown_done`](Self::wait_shutdown_done) for synchronous contexts | ||||||||||||
| /// where `block_on` would deadlock. | ||||||||||||
| /// | ||||||||||||
| /// # Errors | ||||||||||||
| /// Returns [`SharedRuntimeError::LockFailed`] if a mutex was poisoned. | ||||||||||||
| pub fn trigger_shutdown_signal(&self) -> Result<(), SharedRuntimeError> { | ||||||||||||
| let handle = self.handle.clone(); | ||||||||||||
|
|
||||||||||||
| // State → workers lock order: triggered is set before the registry is emptied, | ||||||||||||
| // so a concurrent spawn_worker can't slip through. | ||||||||||||
| let mut state = self.shutdown_tracker.state.lock().map_err(|e| { | ||||||||||||
| SharedRuntimeError::LockFailed(format!("shutdown tracker state mutex poisoned: {e}")) | ||||||||||||
| })?; | ||||||||||||
| let workers = { | ||||||||||||
| let mut workers_lock = self.registry.workers.lock().map_err(|e| { | ||||||||||||
| SharedRuntimeError::LockFailed(format!("workers mutex poisoned: {e}")) | ||||||||||||
| })?; | ||||||||||||
| std::mem::take(&mut *workers_lock) | ||||||||||||
| }; | ||||||||||||
| let count = workers.len(); | ||||||||||||
| state.triggered = true; | ||||||||||||
| state.expected += count; | ||||||||||||
| drop(state); | ||||||||||||
|
|
||||||||||||
| // Wake any waiter blocked in wait_shutdown_done before trigger was called. | ||||||||||||
| if count == 0 { | ||||||||||||
| self.shutdown_tracker.cv.notify_all(); | ||||||||||||
| return Ok(()); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| for mut entry in workers { | ||||||||||||
| let tracker = self.shutdown_tracker.clone(); | ||||||||||||
| handle.spawn(async move { | ||||||||||||
| if let Err(e) = entry.worker.pause().await { | ||||||||||||
| error!("Worker failed to pause on shutdown trigger: {:?}", e); | ||||||||||||
| } else { | ||||||||||||
| entry.worker.shutdown().await; | ||||||||||||
| } | ||||||||||||
| // Poison-tolerant: panicking would skip the counter bump and block | ||||||||||||
| // wait_shutdown_done. | ||||||||||||
| let mut state = match tracker.state.lock() { | ||||||||||||
| Ok(guard) => guard, | ||||||||||||
| Err(poison) => { | ||||||||||||
| error!("Shutdown tracker state mutex poisoned; counters may be inaccurate"); | ||||||||||||
| poison.into_inner() | ||||||||||||
| } | ||||||||||||
| }; | ||||||||||||
| state.completed += 1; | ||||||||||||
| tracker.cv.notify_all(); | ||||||||||||
| }); | ||||||||||||
| } | ||||||||||||
| Ok(()) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /// Block the calling thread until all workers from | ||||||||||||
| /// [`trigger_shutdown_signal`](Self::trigger_shutdown_signal) complete, or `timeout` elapses. | ||||||||||||
| /// | ||||||||||||
| /// Idempotent. Must be called from a thread that is **not** an executor thread of the host | ||||||||||||
| /// runtime — see the [module docs](self) for the full constraint. A current-thread host | ||||||||||||
| /// runtime is detected and rejected; a single-worker multi-thread runtime is not detectable | ||||||||||||
| /// and remains a caller responsibility. | ||||||||||||
| /// | ||||||||||||
| /// # Errors | ||||||||||||
| /// - [`SharedRuntimeError::WouldDeadlock`] if the host runtime is current-thread and there are | ||||||||||||
| /// still outstanding shutdowns to wait on. | ||||||||||||
| /// - [`SharedRuntimeError::ShutdownTimedOut`] on timeout. | ||||||||||||
| pub fn wait_shutdown_done(&self, timeout: Duration) -> Result<(), SharedRuntimeError> { | ||||||||||||
| // Poison-tolerant: counter state has no invariants a panicking holder could break. | ||||||||||||
| let state = self | ||||||||||||
| .shutdown_tracker | ||||||||||||
| .state | ||||||||||||
| .lock() | ||||||||||||
| .unwrap_or_else(|poison| poison.into_inner()); | ||||||||||||
|
|
||||||||||||
| // Fast path: nothing to wait on (no workers triggered, or all already completed). | ||||||||||||
| if state.completed >= state.expected { | ||||||||||||
| return Ok(()); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // Tasks spawned by trigger_shutdown_signal run on `self.handle`. If that runtime is | ||||||||||||
| // current-thread, parking this thread on the condvar starves them and we deadlock | ||||||||||||
| // until `timeout` fires; reject the call instead. | ||||||||||||
| if self.handle.runtime_flavor() == RuntimeFlavor::CurrentThread { | ||||||||||||
| return Err(SharedRuntimeError::WouldDeadlock); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| let (_state, res) = self | ||||||||||||
| .shutdown_tracker | ||||||||||||
| .cv | ||||||||||||
| .wait_timeout_while(state, timeout, |s| s.completed < s.expected) | ||||||||||||
|
Aaalibaba42 marked this conversation as resolved.
Comment on lines
+204
to
+207
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When Useful? React with 👍 / 👎.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I've changed the documentation to better reflect this as a caller responsibility.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this kind of requirement is practical. If the borrowed runtime is, downstream, some random client's app tokio runtime, do we really have any control over how it is configured? I'm not sure how |
||||||||||||
| .unwrap_or_else(|err| err.into_inner()); | ||||||||||||
| if res.timed_out() { | ||||||||||||
| Err(SharedRuntimeError::ShutdownTimedOut(timeout)) | ||||||||||||
| } else { | ||||||||||||
| Ok(()) | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| impl Drop for BorrowedRuntime { | ||||||||||||
| /// Fire-and-forget pause for any still-running workers; never blocks. | ||||||||||||
| /// | ||||||||||||
| /// For a guaranteed drain, call [`trigger_shutdown_signal`](Self::trigger_shutdown_signal) + | ||||||||||||
| /// [`wait_shutdown_done`](Self::wait_shutdown_done) before dropping. | ||||||||||||
| fn drop(&mut self) { | ||||||||||||
| let workers = { | ||||||||||||
| let mut guard = match self.registry.workers.lock() { | ||||||||||||
| Ok(g) => g, | ||||||||||||
| Err(poison) => poison.into_inner(), | ||||||||||||
| }; | ||||||||||||
|
Comment on lines
+224
to
+227
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit
Suggested change
|
||||||||||||
| std::mem::take(&mut *guard) | ||||||||||||
| }; | ||||||||||||
| if workers.is_empty() { | ||||||||||||
| return; | ||||||||||||
| } | ||||||||||||
| let handle = self.handle.clone(); | ||||||||||||
| for mut entry in workers { | ||||||||||||
| handle.spawn(async move { | ||||||||||||
| if let Err(e) = entry.worker.pause().await { | ||||||||||||
| debug!("Worker failed to pause during borrowed-mode Drop: {:?}", e); | ||||||||||||
| } | ||||||||||||
| }); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /// Condvar-based completion tracker for [`BorrowedRuntime::trigger_shutdown_signal`]. | ||||||||||||
| #[derive(Debug, Default)] | ||||||||||||
| struct ShutdownTracker { | ||||||||||||
| state: Mutex<ShutdownState>, | ||||||||||||
| cv: Condvar, | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| #[derive(Debug, Default)] | ||||||||||||
| struct ShutdownState { | ||||||||||||
| /// Total expected completions; bumped by `trigger_shutdown_signal`. | ||||||||||||
| expected: usize, | ||||||||||||
| /// Workers that have completed shutdown. | ||||||||||||
| completed: usize, | ||||||||||||
| /// Set by either shutdown path; blocks further `spawn_worker` calls. | ||||||||||||
| triggered: bool, | ||||||||||||
| } | ||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most
Handlemethods take&self. Any reason to do a preventive clone here, instead of having a getter and letting the caller decide to clone if needed?