Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 48 additions & 21 deletions libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,33 @@ impl TraceExporterBuilder {
self
}

#[allow(missing_docs)]
/// Build the [`TraceExporter`] synchronously.
///
/// Sync facade over [`Self::build_async`]; panics inside an existing tokio context.
/// Not available on wasm — use [`Self::build_async`] there.
#[cfg(not(target_arch = "wasm32"))]
pub fn build<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static>(
mut self,
) -> Result<TraceExporter<C>, TraceExporterError> {
let shared_runtime = match self.shared_runtime.as_ref() {
Some(rt) => rt.clone(),
None => {
let rt = Self::new_shared_runtime()?;
self.shared_runtime = Some(rt.clone());
rt
}
};
shared_runtime.block_on(self.build_async::<C>())?
}

/// Build the [`TraceExporter`] asynchronously.
///
/// Awaits all async setup (e.g. telemetry start-up). `C::new_client()` is invoked
/// inside `shared_runtime`'s tokio context via a scoped `EnterGuard`, so the caller
/// does not have to already be on that runtime.
pub async fn build_async<
C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static,
>(
self,
) -> Result<TraceExporter<C>, TraceExporterError> {
if !Self::is_inputs_outputs_formats_compatible(self.input_format, self.output_format) {
Expand All @@ -300,9 +325,7 @@ impl TraceExporterBuilder {

let shared_runtime = match self.shared_runtime {
Some(rt) => rt,
None => Arc::new(SharedRuntime::new().map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string()))
})?),
None => Self::new_shared_runtime()?,
};

let dogstatsd = self.dogstatsd_url.and_then(|u| {
Expand All @@ -318,18 +341,20 @@ impl TraceExporterBuilder {

let libdatadog_version = tag!("libdatadog_version", env!("CARGO_PKG_VERSION"));

// On native, `C::new_client()` may capture `tokio::runtime::Handle::current()`
// internally (e.g. `NativeCapabilities`). Enter the SharedRuntime's tokio context
// so that handle is available. On wasm this is a no-op — the JS event loop is
// always the implicit executor.
#[cfg(not(target_arch = "wasm32"))]
let _guard = shared_runtime
.runtime_handle()
.map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string()))
})?
.enter();
let capabilities = C::new_client();
// Enter the runtime context so `C::new_client()` can capture `Handle::current()`.
// Scoped block drops the `!Send` EnterGuard before any `.await`.
let capabilities = {
#[cfg(not(target_arch = "wasm32"))]
let _guard = shared_runtime
.runtime_handle()
.map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(
e.to_string(),
))
})?
.enter();
C::new_client()
};

// --- Platform-specific worker setup ---
// The blocks below spawn background workers via `SharedRuntime`. On
Expand Down Expand Up @@ -400,11 +425,7 @@ impl TraceExporterBuilder {
e.to_string(),
))
})?;
shared_runtime.block_on(client_tel.start()).map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(
e.to_string(),
))
})?;
client_tel.start().await;
(Some(client_tel), Some(handle))
}
Some(Err(e)) => return Err(e),
Expand Down Expand Up @@ -498,6 +519,12 @@ impl TraceExporterBuilder {
})
}

fn new_shared_runtime() -> Result<Arc<SharedRuntime>, TraceExporterError> {
SharedRuntime::new().map(Arc::new).map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string()))
})
}

fn is_inputs_outputs_formats_compatible(
input: TraceExporterInputFormat,
output: TraceExporterOutputFormat,
Expand Down
180 changes: 69 additions & 111 deletions libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,29 +211,34 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra

/// Stop the background workers owned by this exporter.
///
/// Only the workers spawned for this exporter are stopped. Workers from other components
/// sharing the same [`SharedRuntime`] are unaffected.
/// Sync facade over [`Self::shutdown_async`]; panics inside an existing tokio context.
/// Workers from other components on the same [`SharedRuntime`] are unaffected.
///
/// # Errors
/// Returns [`SharedRuntimeError::ShutdownTimedOut`] if a timeout was given and elapsed before
/// all workers finished.
/// Returns [`TraceExporterError::Shutdown(ShutdownError::TimedOut)`] if a timeout was
/// given and elapsed before all workers finished.
#[cfg(not(target_arch = "wasm32"))]
pub fn shutdown(self, timeout: Option<Duration>) -> Result<(), TraceExporterError> {
let runtime = self.shared_runtime.clone();
runtime.block_on(self.shutdown_async(timeout))?
}

/// Async version of [`Self::shutdown`].
///
/// # Errors
/// Returns [`TraceExporterError::Shutdown(ShutdownError::TimedOut)`] if a timeout was
/// given and elapsed before all workers finished.
#[cfg(not(target_arch = "wasm32"))]
pub async fn shutdown_async(self, timeout: Option<Duration>) -> Result<(), TraceExporterError> {
if let Some(timeout) = timeout {
match runtime
.block_on(async { tokio::time::timeout(timeout, self.shutdown_workers()).await })
.map_err(TraceExporterError::Io)?
{
match tokio::time::timeout(timeout, self.shutdown_workers()).await {
Ok(()) => Ok(()),
Err(_) => Err(TraceExporterError::Shutdown(ShutdownError::TimedOut(
timeout,
))),
}
} else {
runtime
.block_on(self.shutdown_workers())
.map_err(TraceExporterError::Io)?;
self.shutdown_workers().await;
Ok(())
}
}
Expand Down Expand Up @@ -271,34 +276,22 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
let _ = self;
}

/// Send msgpack serialized traces to the agent
///
/// # Arguments
/// Send msgpack serialized traces to the agent.
///
/// * data: A slice containing the serialized traces. This slice should be encoded following the
/// input_format passed to the TraceExporter on creating.
///
/// # Returns
/// * Ok(AgentResponse): The response from the agent
/// * Err(TraceExporterError): An error detailing what went wrong in the process
/// Sync facade over [`Self::send_async`]; panics inside an existing tokio context.
/// `data` must be encoded per the `input_format` given to the builder. Returns the
/// agent response on success.
#[cfg(not(target_arch = "wasm32"))]
pub fn send(&self, data: &[u8]) -> Result<AgentResponse, TraceExporterError> {
self.check_agent_info();

let res = self.send_deser(data, self.input_format.into())?;
if matches!(&res, AgentResponse::Changed { body } if body.is_empty()) {
return Err(TraceExporterError::Agent(
error::AgentErrorKind::EmptyResponse,
));
}

Ok(res)
self.shared_runtime.block_on(self.send_async(data))?
}

/// **WARNING**: This method is experimental and should not be used for production.
/// Async version of [`Self::send`] for platforms that cannot use `block_on` (e.g. wasm)
/// Send msgpack serialized traces to the agent.
///
/// `data` must be encoded per the `input_format` given to the builder.
/// [`Self::send`] is the sync facade over this method.
pub async fn send_async(&self, data: &[u8]) -> Result<AgentResponse, TraceExporterError> {
self.check_agent_info();
self.check_agent_info().await;

let format: DeserInputFormat = self.input_format.into();

Expand Down Expand Up @@ -344,48 +337,50 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
}

#[cfg(not(target_arch = "wasm32"))]
fn check_agent_info(&self) {
if let Some(agent_info) = agent_info::get_agent_info() {
if self.has_agent_info_state_changed(&agent_info) {
match &**self.client_side_stats.status.load() {
StatsComputationStatus::Disabled => {}
StatsComputationStatus::DisabledByAgent { .. } => {
let ctx = stats::StatsContext {
metadata: &self.metadata,
endpoint_url: &self.endpoint.url,
shared_runtime: &self.shared_runtime,
};
stats::handle_stats_disabled_by_agent(
&ctx,
&agent_info,
self.capabilities.clone(),
&self.client_side_stats,
);
}
StatsComputationStatus::Enabled {
stats_concentrator, ..
} => {
let ctx = stats::StatsContext {
metadata: &self.metadata,
endpoint_url: &self.endpoint.url,
shared_runtime: &self.shared_runtime,
};
stats::handle_stats_enabled(
&ctx,
&agent_info,
stats_concentrator,
&self.client_side_stats,
);
}
}
self.previous_info_state
.store(Some(agent_info.state_hash.clone().into()))
/// Reconcile in-process stats state with the latest agent info.
/// Async so the `Enabled` arm can await a stats-worker shutdown without `block_on`.
async fn check_agent_info(&self) {
let Some(agent_info) = agent_info::get_agent_info() else {
return;
};
if !self.has_agent_info_state_changed(&agent_info) {
return;
}

// load_full() avoids holding an ArcSwap Guard (!Send) across .await.
let status = self.client_side_stats.status.load_full();
match &*status {
StatsComputationStatus::Disabled => {}
StatsComputationStatus::DisabledByAgent { .. } => {
let ctx = stats::StatsContext {
metadata: &self.metadata,
endpoint_url: &self.endpoint.url,
shared_runtime: &self.shared_runtime,
};
stats::handle_stats_disabled_by_agent(
&ctx,
&agent_info,
self.capabilities.clone(),
&self.client_side_stats,
);
}
StatsComputationStatus::Enabled {
stats_concentrator, ..
} => {
stats::handle_stats_enabled(
&agent_info,
stats_concentrator,
&self.client_side_stats,
)
.await;
}
}
self.previous_info_state
.store(Some(agent_info.state_hash.clone().into()))
}

#[cfg(target_arch = "wasm32")]
fn check_agent_info(&self) {
async fn check_agent_info(&self) {
// No background workers on wasm — agent info is never fetched, stats are
// never computed. This is intentionally a no-op.
}
Expand Down Expand Up @@ -437,20 +432,15 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra

/// Send a list of trace chunks to the agent (or OTLP endpoint when configured).
///
/// # Arguments
/// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans.
///
/// # Returns
/// * Ok(AgentResponse): The response from the agent (or Unchanged for OTLP)
/// * Err(TraceExporterError): An error detailing what went wrong in the process
/// Sync facade over [`Self::send_trace_chunks_async`]; panics inside an existing
/// tokio context. Returns the agent response (or `Unchanged` for OTLP).
#[cfg(not(target_arch = "wasm32"))]
pub fn send_trace_chunks<T: TraceData>(
&self,
trace_chunks: Vec<Vec<Span<T>>>,
) -> Result<AgentResponse, TraceExporterError> {
self.check_agent_info();
self.shared_runtime
.block_on(async { self.send_trace_chunks_inner(trace_chunks).await })?
.block_on(self.send_trace_chunks_async(trace_chunks))?
}

/// Send a list of trace chunks to the agent, asynchronously (or OTLP when configured).
Expand All @@ -465,7 +455,7 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
&self,
trace_chunks: Vec<Vec<Span<T>>>,
) -> Result<AgentResponse, TraceExporterError> {
self.check_agent_info();
self.check_agent_info().await;
self.send_trace_chunks_inner(trace_chunks).await
}

Expand Down Expand Up @@ -500,38 +490,6 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
Ok(AgentResponse::Unchanged)
}

/// Deserializes, processes and sends trace chunks to the agent
#[cfg(not(target_arch = "wasm32"))]
fn send_deser(
&self,
data: &[u8],
format: DeserInputFormat,
) -> Result<AgentResponse, TraceExporterError> {
let (traces, _) = match format {
DeserInputFormat::V04 => msgpack_decoder::v04::from_slice(data),
DeserInputFormat::V05 => msgpack_decoder::v05::from_slice(data),
}
.map_err(|e| {
error!("Error deserializing trace from request body: {e}");
self.emit_metric(
HealthMetric::Count(health_metrics::DESERIALIZE_TRACES_ERRORS, 1),
None,
);
TraceExporterError::Deserialization(e)
})?;
debug!(
trace_count = traces.len(),
"Trace deserialization completed successfully"
);
self.emit_metric(
HealthMetric::Count(health_metrics::DESERIALIZE_TRACES, traces.len() as i64),
None,
);

self.shared_runtime
.block_on(async { self.send_trace_chunks_inner(traces).await })?
}

/// Send traces payload to agent with retry and telemetry reporting
async fn send_traces_with_telemetry(
&self,
Expand Down
Loading
Loading