diff --git a/libdd-data-pipeline-ffi/Cargo.toml b/libdd-data-pipeline-ffi/Cargo.toml index 758788ae90..be8f6839f9 100644 --- a/libdd-data-pipeline-ffi/Cargo.toml +++ b/libdd-data-pipeline-ffi/Cargo.toml @@ -36,4 +36,6 @@ libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime" } libdd-common-ffi = { path = "../libdd-common-ffi", default-features = false } libdd-tinybytes = { path = "../libdd-tinybytes" } libdd-trace-utils = { path = "../libdd-trace-utils" } +tokio = { version = "1.23", features = ["macros"] } +tokio-util = "0.7.11" tracing = { version = "0.1", default-features = false } diff --git a/libdd-data-pipeline-ffi/cbindgen.toml b/libdd-data-pipeline-ffi/cbindgen.toml index a0fe97f467..b5879c3018 100644 --- a/libdd-data-pipeline-ffi/cbindgen.toml +++ b/libdd-data-pipeline-ffi/cbindgen.toml @@ -32,6 +32,7 @@ exclude = ["TraceExporter", "TracerSpan", "TracerTraceChunks"] "TracerSpan" = "ddog_TracerSpan" "TracerSpanFields" = "ddog_TracerSpanFields" "TracerTraceChunks" = "ddog_TracerTraceChunks" +"Handle_TokioCancellationToken" = "ddog_TraceExporterCancelToken" [export.mangle] rename_types = "PascalCase" diff --git a/libdd-data-pipeline-ffi/src/tracer.rs b/libdd-data-pipeline-ffi/src/tracer.rs index 97d05dbe47..65482224c4 100644 --- a/libdd-data-pipeline-ffi/src/tracer.rs +++ b/libdd-data-pipeline-ffi/src/tracer.rs @@ -13,12 +13,15 @@ use crate::error::{ExporterError, ExporterErrorCode as ErrorCode}; use crate::response::ExporterResponse; use crate::trace_exporter::TraceExporter; use crate::{catch_panic, gen_error}; +use libdd_common_ffi::handle::{Handle, ToInner}; use libdd_common_ffi::slice::AsBytes; use libdd_common_ffi::CharSlice; use libdd_tinybytes::BytesString; use libdd_trace_utils::span::v04::SpanBytes; use std::ptr::NonNull; +type TokioCancellationToken = tokio_util::sync::CancellationToken; + // --------------------------------------------------------------------------- // Helper // --------------------------------------------------------------------------- @@ -295,6 +298,53 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span( ) } +// --------------------------------------------------------------------------- +// Cancellation token +// --------------------------------------------------------------------------- + +/// Create a new cancellation token. +/// +/// The returned handle must be freed with +/// [`ddog_trace_exporter_cancel_token_drop`]. +#[no_mangle] +pub extern "C" fn ddog_trace_exporter_cancel_token_new() -> Handle { + Handle::from(TokioCancellationToken::new()) +} + +/// Cancel a cancellation token. +/// +/// All clones of the same token observe the cancellation. If the token is +/// currently passed to [`ddog_trace_exporter_send_trace_chunks`], the +/// in-flight HTTP request will be aborted cooperatively. +/// +/// # Safety +/// +/// * `token` must point to a valid [`Handle`] returned by +/// [`ddog_trace_exporter_cancel_token_new`]. +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_cancel_token_cancel( + mut token: *mut Handle, +) { + if let Ok(inner) = token.to_inner_mut() { + inner.cancel(); + } +} + +/// Free a cancellation token handle. +/// +/// After this call the pointer is invalid and must not be reused. +/// +/// # Safety +/// +/// * `token` must point to a valid [`Handle`] returned by +/// [`ddog_trace_exporter_cancel_token_new`], or be null. +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_cancel_token_drop( + mut token: *mut Handle, +) { + drop(token.take()); +} + // --------------------------------------------------------------------------- // Send trace chunks // --------------------------------------------------------------------------- @@ -305,6 +355,12 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span( /// serializes in the configured output format, and sends to the agent /// with retry logic. /// +/// When `cancel` is non-null it must point to a live +/// [`Handle`] obtained from +/// [`ddog_trace_exporter_cancel_token_new`]. If the token is cancelled +/// while the send is in progress the HTTP request is aborted and an +/// error with code [`ExporterErrorCode::IoError`] is returned. +/// /// On success, if `response_out` is non-null, a heap-allocated /// [`ExporterResponse`] is written there. The caller owns it and must /// free it with `ddog_trace_exporter_response_free`. @@ -315,11 +371,13 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span( /// * `chunks` is consumed and must not be used after this call. /// * If `response_out` is non-null it must point to valid writable memory for a /// `Box`. +/// * If `cancel` is non-null it must point to a valid cancellation token handle. #[no_mangle] pub unsafe extern "C" fn ddog_trace_exporter_send_trace_chunks( exporter: Option<&TraceExporter>, chunks: Option>, response_out: Option>>, + mut cancel: *mut Handle, ) -> Option> { let Some(exporter) = exporter else { return gen_error!(ErrorCode::InvalidArgument); @@ -328,15 +386,132 @@ pub unsafe extern "C" fn ddog_trace_exporter_send_trace_chunks( return gen_error!(ErrorCode::InvalidArgument); }; + // Clone the cancellation token (if provided) so the caller retains + // ownership of the handle while we use a cheap clone inside select!. + let cancel_token: Option = if cancel.is_null() { + None + } else { + cancel.to_inner_mut().ok().map(|t| t.clone()) + }; + catch_panic!( - match exporter.send_trace_chunks(chunks.0) { - Ok(resp) => { - if let Some(out) = response_out { - out.as_ptr().write(Box::new(ExporterResponse::from(resp))); + { + let result = if let Some(ct) = cancel_token { + // Use select! so we can abort the in-flight request when + // the caller cancels. + let block_result = exporter.shared_runtime().block_on(async { + tokio::select! { + res = exporter.send_trace_chunks_async(chunks.0) => res, + _ = ct.cancelled() => Err( + std::io::Error::new( + std::io::ErrorKind::Interrupted, + "send cancelled via cancellation token", + ).into() + ), + } + }); + match block_result { + Ok(inner) => inner, + Err(io_err) => Err(io_err.into()), + } + } else { + exporter.send_trace_chunks(chunks.0) + }; + + match result { + Ok(resp) => { + if let Some(out) = response_out { + out.as_ptr().write(Box::new(ExporterResponse::from(resp))); + } + None } - None + Err(e) => Some(Box::new(ExporterError::from(e))), } - Err(e) => Some(Box::new(ExporterError::from(e))), + }, + gen_error!(ErrorCode::Panic) + ) +} + +// --------------------------------------------------------------------------- +// Fork safety hooks +// --------------------------------------------------------------------------- + +/// Must be called in the parent process before `fork()`. +/// +/// Pauses all workers on the exporter's [`SharedRuntime`] so that no +/// background threads are running during the fork. +/// +/// # Safety +/// +/// * `exporter` must be a valid `TraceExporter` pointer or null. +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_before_fork( + exporter: Option<&TraceExporter>, +) -> Option> { + let Some(exporter) = exporter else { + return gen_error!(ErrorCode::InvalidArgument); + }; + + catch_panic!( + { + exporter.shared_runtime().before_fork(); + None + }, + gen_error!(ErrorCode::Panic) + ) +} + +/// Must be called in the parent process after `fork()`. +/// +/// Restarts workers that were paused by +/// [`ddog_trace_exporter_before_fork`]. +/// +/// # Safety +/// +/// * `exporter` must be a valid `TraceExporter` pointer or null. +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_after_fork_in_parent( + exporter: Option<&TraceExporter>, +) -> Option> { + let Some(exporter) = exporter else { + return gen_error!(ErrorCode::InvalidArgument); + }; + + catch_panic!( + match exporter.shared_runtime().after_fork_parent() { + Ok(()) => None, + Err(e) => Some(Box::new(ExporterError::new( + ErrorCode::Internal, + &e.to_string(), + ))), + }, + gen_error!(ErrorCode::Panic) + ) +} + +/// Must be called in the child process after `fork()`. +/// +/// Creates a fresh tokio runtime and restarts all workers on the +/// exporter's [`SharedRuntime`]. +/// +/// # Safety +/// +/// * `exporter` must be a valid `TraceExporter` pointer or null. +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_after_fork_in_child( + exporter: Option<&TraceExporter>, +) -> Option> { + let Some(exporter) = exporter else { + return gen_error!(ErrorCode::InvalidArgument); + }; + + catch_panic!( + match exporter.shared_runtime().after_fork_child() { + Ok(()) => None, + Err(e) => Some(Box::new(ExporterError::new( + ErrorCode::Internal, + &e.to_string(), + ))), }, gen_error!(ErrorCode::Panic) ) @@ -651,7 +826,12 @@ mod tests { fn send_trace_chunks_null_exporter_returns_error() { unsafe { let chunks = make_chunks(0); - let err = ddog_trace_exporter_send_trace_chunks(None, Some(chunks), None); + let err = ddog_trace_exporter_send_trace_chunks( + None, + Some(chunks), + None, + std::ptr::null_mut(), + ); assert!(err.is_some()); assert_eq!(err.as_ref().unwrap().code, ErrorCode::InvalidArgument); ddog_trace_exporter_error_free(err); @@ -687,4 +867,73 @@ mod tests { ddog_tracer_trace_chunks_free(chunks); } } + + // -- Cancellation token ------------------------------------------------- + + #[test] + fn cancel_token_new_and_drop() { + unsafe { + let mut token = ddog_trace_exporter_cancel_token_new(); + let ptr: *mut Handle = &mut token; + ddog_trace_exporter_cancel_token_drop(ptr); + } + } + + #[test] + fn cancel_token_cancel() { + unsafe { + let mut token = ddog_trace_exporter_cancel_token_new(); + let ptr: *mut Handle = &mut token; + ddog_trace_exporter_cancel_token_cancel(ptr); + ddog_trace_exporter_cancel_token_drop(ptr); + } + } + + #[test] + fn send_trace_chunks_null_cancel_is_accepted() { + // Passing a null cancel pointer should behave like the old + // signature (no cancellation). + unsafe { + let chunks = make_chunks(0); + let err = ddog_trace_exporter_send_trace_chunks( + None, + Some(chunks), + None, + std::ptr::null_mut(), + ); + // exporter is None, so we get InvalidArgument, but no crash + // from the null cancel pointer. + assert!(err.is_some()); + ddog_trace_exporter_error_free(err); + } + } + + // -- Fork safety hooks -------------------------------------------------- + + #[test] + fn before_fork_null_returns_error() { + unsafe { + let err = ddog_trace_exporter_before_fork(None); + assert!(err.is_some()); + ddog_trace_exporter_error_free(err); + } + } + + #[test] + fn after_fork_in_parent_null_returns_error() { + unsafe { + let err = ddog_trace_exporter_after_fork_in_parent(None); + assert!(err.is_some()); + ddog_trace_exporter_error_free(err); + } + } + + #[test] + fn after_fork_in_child_null_returns_error() { + unsafe { + let err = ddog_trace_exporter_after_fork_in_child(None); + assert!(err.is_some()); + ddog_trace_exporter_error_free(err); + } + } } diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 7e3dd1c951..d23ce37200 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -209,6 +209,11 @@ impl Tra TraceExporterBuilder::default() } + /// Returns a reference to the underlying [`SharedRuntime`]. + pub fn shared_runtime(&self) -> &SharedRuntime { + &self.shared_runtime + } + /// Stop the background workers owned by this exporter. /// /// Only the workers spawned for this exporter are stopped. Workers from other components