From a8618b280463835888eceaa5d2672625a6702bdc Mon Sep 17 00:00:00 2001 From: Jules Wiriath Date: Fri, 29 May 2026 18:31:50 +0200 Subject: [PATCH] feat(data-pipeline): move the async boundary up --- .../src/trace_exporter/builder.rs | 69 +++++-- libdd-data-pipeline/src/trace_exporter/mod.rs | 180 +++++++----------- .../src/trace_exporter/stats.rs | 27 ++- 3 files changed, 128 insertions(+), 148 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index c9430eadf2..937b60c1ca 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -286,8 +286,33 @@ impl TraceExporterBuilder { self } - #[allow(missing_docs)] + /// Build the [`TraceExporter`] synchronously. + /// + /// Sync facade over [`Self::build_async`]; panics inside an existing tokio context. + /// Not available on wasm — use [`Self::build_async`] there. + #[cfg(not(target_arch = "wasm32"))] pub fn build( + mut self, + ) -> Result, TraceExporterError> { + let shared_runtime = match self.shared_runtime.as_ref() { + Some(rt) => rt.clone(), + None => { + let rt = Self::new_shared_runtime()?; + self.shared_runtime = Some(rt.clone()); + rt + } + }; + shared_runtime.block_on(self.build_async::())? + } + + /// Build the [`TraceExporter`] asynchronously. + /// + /// Awaits all async setup (e.g. telemetry start-up). `C::new_client()` is invoked + /// inside `shared_runtime`'s tokio context via a scoped `EnterGuard`, so the caller + /// does not have to already be on that runtime. + pub async fn build_async< + C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, + >( self, ) -> Result, TraceExporterError> { if !Self::is_inputs_outputs_formats_compatible(self.input_format, self.output_format) { @@ -300,9 +325,7 @@ impl TraceExporterBuilder { let shared_runtime = match self.shared_runtime { Some(rt) => rt, - None => Arc::new(SharedRuntime::new().map_err(|e| { - TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string())) - })?), + None => Self::new_shared_runtime()?, }; let dogstatsd = self.dogstatsd_url.and_then(|u| { @@ -318,18 +341,20 @@ impl TraceExporterBuilder { let libdatadog_version = tag!("libdatadog_version", env!("CARGO_PKG_VERSION")); - // On native, `C::new_client()` may capture `tokio::runtime::Handle::current()` - // internally (e.g. `NativeCapabilities`). Enter the SharedRuntime's tokio context - // so that handle is available. On wasm this is a no-op — the JS event loop is - // always the implicit executor. - #[cfg(not(target_arch = "wasm32"))] - let _guard = shared_runtime - .runtime_handle() - .map_err(|e| { - TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string())) - })? - .enter(); - let capabilities = C::new_client(); + // Enter the runtime context so `C::new_client()` can capture `Handle::current()`. + // Scoped block drops the `!Send` EnterGuard before any `.await`. + let capabilities = { + #[cfg(not(target_arch = "wasm32"))] + let _guard = shared_runtime + .runtime_handle() + .map_err(|e| { + TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( + e.to_string(), + )) + })? + .enter(); + C::new_client() + }; // --- Platform-specific worker setup --- // The blocks below spawn background workers via `SharedRuntime`. On @@ -400,11 +425,7 @@ impl TraceExporterBuilder { e.to_string(), )) })?; - shared_runtime.block_on(client_tel.start()).map_err(|e| { - TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( - e.to_string(), - )) - })?; + client_tel.start().await; (Some(client_tel), Some(handle)) } Some(Err(e)) => return Err(e), @@ -498,6 +519,12 @@ impl TraceExporterBuilder { }) } + fn new_shared_runtime() -> Result, TraceExporterError> { + SharedRuntime::new().map(Arc::new).map_err(|e| { + TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string())) + }) + } + fn is_inputs_outputs_formats_compatible( input: TraceExporterInputFormat, output: TraceExporterOutputFormat, diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 7e3dd1c951..7c5b5b042b 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -211,29 +211,34 @@ impl Tra /// Stop the background workers owned by this exporter. /// - /// Only the workers spawned for this exporter are stopped. Workers from other components - /// sharing the same [`SharedRuntime`] are unaffected. + /// Sync facade over [`Self::shutdown_async`]; panics inside an existing tokio context. + /// Workers from other components on the same [`SharedRuntime`] are unaffected. /// /// # Errors - /// Returns [`SharedRuntimeError::ShutdownTimedOut`] if a timeout was given and elapsed before - /// all workers finished. + /// Returns [`TraceExporterError::Shutdown(ShutdownError::TimedOut)`] if a timeout was + /// given and elapsed before all workers finished. #[cfg(not(target_arch = "wasm32"))] pub fn shutdown(self, timeout: Option) -> Result<(), TraceExporterError> { let runtime = self.shared_runtime.clone(); + runtime.block_on(self.shutdown_async(timeout))? + } + + /// Async version of [`Self::shutdown`]. + /// + /// # Errors + /// Returns [`TraceExporterError::Shutdown(ShutdownError::TimedOut)`] if a timeout was + /// given and elapsed before all workers finished. + #[cfg(not(target_arch = "wasm32"))] + pub async fn shutdown_async(self, timeout: Option) -> Result<(), TraceExporterError> { if let Some(timeout) = timeout { - match runtime - .block_on(async { tokio::time::timeout(timeout, self.shutdown_workers()).await }) - .map_err(TraceExporterError::Io)? - { + match tokio::time::timeout(timeout, self.shutdown_workers()).await { Ok(()) => Ok(()), Err(_) => Err(TraceExporterError::Shutdown(ShutdownError::TimedOut( timeout, ))), } } else { - runtime - .block_on(self.shutdown_workers()) - .map_err(TraceExporterError::Io)?; + self.shutdown_workers().await; Ok(()) } } @@ -271,34 +276,22 @@ impl Tra let _ = self; } - /// Send msgpack serialized traces to the agent - /// - /// # Arguments + /// Send msgpack serialized traces to the agent. /// - /// * data: A slice containing the serialized traces. This slice should be encoded following the - /// input_format passed to the TraceExporter on creating. - /// - /// # Returns - /// * Ok(AgentResponse): The response from the agent - /// * Err(TraceExporterError): An error detailing what went wrong in the process + /// Sync facade over [`Self::send_async`]; panics inside an existing tokio context. + /// `data` must be encoded per the `input_format` given to the builder. Returns the + /// agent response on success. #[cfg(not(target_arch = "wasm32"))] pub fn send(&self, data: &[u8]) -> Result { - self.check_agent_info(); - - let res = self.send_deser(data, self.input_format.into())?; - if matches!(&res, AgentResponse::Changed { body } if body.is_empty()) { - return Err(TraceExporterError::Agent( - error::AgentErrorKind::EmptyResponse, - )); - } - - Ok(res) + self.shared_runtime.block_on(self.send_async(data))? } - /// **WARNING**: This method is experimental and should not be used for production. - /// Async version of [`Self::send`] for platforms that cannot use `block_on` (e.g. wasm) + /// Send msgpack serialized traces to the agent. + /// + /// `data` must be encoded per the `input_format` given to the builder. + /// [`Self::send`] is the sync facade over this method. pub async fn send_async(&self, data: &[u8]) -> Result { - self.check_agent_info(); + self.check_agent_info().await; let format: DeserInputFormat = self.input_format.into(); @@ -344,48 +337,50 @@ impl Tra } #[cfg(not(target_arch = "wasm32"))] - fn check_agent_info(&self) { - if let Some(agent_info) = agent_info::get_agent_info() { - if self.has_agent_info_state_changed(&agent_info) { - match &**self.client_side_stats.status.load() { - StatsComputationStatus::Disabled => {} - StatsComputationStatus::DisabledByAgent { .. } => { - let ctx = stats::StatsContext { - metadata: &self.metadata, - endpoint_url: &self.endpoint.url, - shared_runtime: &self.shared_runtime, - }; - stats::handle_stats_disabled_by_agent( - &ctx, - &agent_info, - self.capabilities.clone(), - &self.client_side_stats, - ); - } - StatsComputationStatus::Enabled { - stats_concentrator, .. - } => { - let ctx = stats::StatsContext { - metadata: &self.metadata, - endpoint_url: &self.endpoint.url, - shared_runtime: &self.shared_runtime, - }; - stats::handle_stats_enabled( - &ctx, - &agent_info, - stats_concentrator, - &self.client_side_stats, - ); - } - } - self.previous_info_state - .store(Some(agent_info.state_hash.clone().into())) + /// Reconcile in-process stats state with the latest agent info. + /// Async so the `Enabled` arm can await a stats-worker shutdown without `block_on`. + async fn check_agent_info(&self) { + let Some(agent_info) = agent_info::get_agent_info() else { + return; + }; + if !self.has_agent_info_state_changed(&agent_info) { + return; + } + + // load_full() avoids holding an ArcSwap Guard (!Send) across .await. + let status = self.client_side_stats.status.load_full(); + match &*status { + StatsComputationStatus::Disabled => {} + StatsComputationStatus::DisabledByAgent { .. } => { + let ctx = stats::StatsContext { + metadata: &self.metadata, + endpoint_url: &self.endpoint.url, + shared_runtime: &self.shared_runtime, + }; + stats::handle_stats_disabled_by_agent( + &ctx, + &agent_info, + self.capabilities.clone(), + &self.client_side_stats, + ); + } + StatsComputationStatus::Enabled { + stats_concentrator, .. + } => { + stats::handle_stats_enabled( + &agent_info, + stats_concentrator, + &self.client_side_stats, + ) + .await; } } + self.previous_info_state + .store(Some(agent_info.state_hash.clone().into())) } #[cfg(target_arch = "wasm32")] - fn check_agent_info(&self) { + async fn check_agent_info(&self) { // No background workers on wasm — agent info is never fetched, stats are // never computed. This is intentionally a no-op. } @@ -437,20 +432,15 @@ impl Tra /// Send a list of trace chunks to the agent (or OTLP endpoint when configured). /// - /// # Arguments - /// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans. - /// - /// # Returns - /// * Ok(AgentResponse): The response from the agent (or Unchanged for OTLP) - /// * Err(TraceExporterError): An error detailing what went wrong in the process + /// Sync facade over [`Self::send_trace_chunks_async`]; panics inside an existing + /// tokio context. Returns the agent response (or `Unchanged` for OTLP). #[cfg(not(target_arch = "wasm32"))] pub fn send_trace_chunks( &self, trace_chunks: Vec>>, ) -> Result { - self.check_agent_info(); self.shared_runtime - .block_on(async { self.send_trace_chunks_inner(trace_chunks).await })? + .block_on(self.send_trace_chunks_async(trace_chunks))? } /// Send a list of trace chunks to the agent, asynchronously (or OTLP when configured). @@ -465,7 +455,7 @@ impl Tra &self, trace_chunks: Vec>>, ) -> Result { - self.check_agent_info(); + self.check_agent_info().await; self.send_trace_chunks_inner(trace_chunks).await } @@ -500,38 +490,6 @@ impl Tra Ok(AgentResponse::Unchanged) } - /// Deserializes, processes and sends trace chunks to the agent - #[cfg(not(target_arch = "wasm32"))] - fn send_deser( - &self, - data: &[u8], - format: DeserInputFormat, - ) -> Result { - let (traces, _) = match format { - DeserInputFormat::V04 => msgpack_decoder::v04::from_slice(data), - DeserInputFormat::V05 => msgpack_decoder::v05::from_slice(data), - } - .map_err(|e| { - error!("Error deserializing trace from request body: {e}"); - self.emit_metric( - HealthMetric::Count(health_metrics::DESERIALIZE_TRACES_ERRORS, 1), - None, - ); - TraceExporterError::Deserialization(e) - })?; - debug!( - trace_count = traces.len(), - "Trace deserialization completed successfully" - ); - self.emit_metric( - HealthMetric::Count(health_metrics::DESERIALIZE_TRACES, traces.len() as i64), - None, - ); - - self.shared_runtime - .block_on(async { self.send_trace_chunks_inner(traces).await })? - } - /// Send traces payload to agent with retry and telemetry reporting async fn send_traces_with_telemetry( &self, diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 7f1aaea36a..3c2fec6cae 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -166,27 +166,22 @@ fn create_and_start_stats_worker< } #[cfg(not(target_arch = "wasm32"))] -/// Stops the stats exporter and disable stats computation -/// -/// Used when client-side stats is disabled by the agent -pub(crate) fn stop_stats_computation( - ctx: &StatsContext, - client_side_stats: &ArcSwap, -) { +/// Transition from `Enabled` to `DisabledByAgent`, awaiting the stats worker shutdown. +pub(crate) async fn stop_stats_computation(client_side_stats: &ArcSwap) { + // load_full() avoids holding an ArcSwap Guard (!Send) across .await. + let snapshot = client_side_stats.load_full(); if let StatsComputationStatus::Enabled { stats_concentrator, worker_handle, .. - } = &**client_side_stats.load() + } = &*snapshot { let bucket_size = stats_concentrator.lock_or_panic().get_bucket_size(); client_side_stats.store(Arc::new(StatsComputationStatus::DisabledByAgent { bucket_size, })); - match ctx.shared_runtime.block_on(worker_handle.clone().stop()) { - Ok(Err(e)) => error!("Failed to stop stats worker: {e}"), - Err(e) => error!("Failed to stop stats worker: {e}"), - _ => {} + if let Err(e) = worker_handle.clone().stop().await { + error!("Failed to stop stats worker: {e}"); } } } @@ -254,9 +249,9 @@ fn update_obfuscation_config( } #[cfg(not(target_arch = "wasm32"))] -/// Handle stats computation when it's already enabled -pub(crate) fn handle_stats_enabled( - ctx: &StatsContext, +/// Reconcile stats config when computation is already enabled. +/// Async because the disabled-by-agent branch awaits [`stop_stats_computation`]. +pub(crate) async fn handle_stats_enabled( agent_info: &Arc, stats_concentrator: &Arc>, client_side_stats: &StatsComputationConfig, @@ -268,7 +263,7 @@ pub(crate) fn handle_stats_enabled( #[cfg(feature = "stats-obfuscation")] update_obfuscation_config(agent_info, client_side_stats); } else { - stop_stats_computation(ctx, &client_side_stats.status); + stop_stats_computation(&client_side_stats.status).await; debug!("Client-side stats computation has been disabled by the agent") } }