Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions libdd-data-pipeline-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime" }
libdd-common-ffi = { path = "../libdd-common-ffi", default-features = false }
libdd-tinybytes = { path = "../libdd-tinybytes" }
libdd-trace-utils = { path = "../libdd-trace-utils" }
tokio = { version = "1.23", features = ["macros"] }
tokio-util = "0.7.11"
Comment on lines +39 to +40
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Regenerate Cargo.lock for the new dependencies

Adding tokio/tokio-util to this crate without committing the corresponding root Cargo.lock update breaks locked builds: I checked cargo check -p libdd-data-pipeline-ffi --locked, and Cargo exits with the lock file /workspace/libdatadog/Cargo.lock needs to be updated. Any CI or consumer build using the repository lockfile will fail before compiling this crate until the lock entry for libdd-data-pipeline-ffi includes these new dependencies.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Member

@marcotc marcotc May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

edit: I think I was wrong, looks like a legit failure: https://github.com/DataDog/libdatadog/actions/runs/26578731968/job/78305509370?pr=2051

original comment:
This feels like a case of "AI doesn't know how "libdd-data-pipeline-ffi" is used.
This might information be worth writing down (maybe in https://github.com/DataDog/libdatadog/blob/22e92370e5d66c0efd6ac5fe949bd8b6514fd902/libdd-data-pipeline-ffi/README.md, or other place if this is common across multiple libdatadog/* crates).

tracing = { version = "0.1", default-features = false }
1 change: 1 addition & 0 deletions libdd-data-pipeline-ffi/cbindgen.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ exclude = ["TraceExporter", "TracerSpan", "TracerTraceChunks"]
"TracerSpan" = "ddog_TracerSpan"
"TracerSpanFields" = "ddog_TracerSpanFields"
"TracerTraceChunks" = "ddog_TracerTraceChunks"
"Handle_TokioCancellationToken" = "ddog_TraceExporterCancelToken"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Declare the opaque cancellation token type in cbindgen

With only the handle rename here, cbindgen emits typedef ddog_CancellationToken ddog_TokioCancellationToken; in data-pipeline.h, but this header only includes common.h, which does not define ddog_CancellationToken; I regenerated the header and cc -fsyntax-only reports unknown type name 'ddog_CancellationToken'. C/C++ consumers that include the generated data-pipeline header will not compile when this new token type is present, so this needs the same opaque token rename/forward declaration pattern used by the profiling FFI header.

Useful? React with 👍 / 👎.


[export.mangle]
rename_types = "PascalCase"
Expand Down
263 changes: 256 additions & 7 deletions libdd-data-pipeline-ffi/src/tracer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ use crate::error::{ExporterError, ExporterErrorCode as ErrorCode};
use crate::response::ExporterResponse;
use crate::trace_exporter::TraceExporter;
use crate::{catch_panic, gen_error};
use libdd_common_ffi::handle::{Handle, ToInner};
use libdd_common_ffi::slice::AsBytes;
use libdd_common_ffi::CharSlice;
use libdd_tinybytes::BytesString;
use libdd_trace_utils::span::v04::SpanBytes;
use std::ptr::NonNull;

type TokioCancellationToken = tokio_util::sync::CancellationToken;

// ---------------------------------------------------------------------------
// Helper
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -295,6 +298,53 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span(
)
}

// ---------------------------------------------------------------------------
// Cancellation token
// ---------------------------------------------------------------------------

/// Create a new cancellation token.
///
/// The returned handle must be freed with
/// [`ddog_trace_exporter_cancel_token_drop`].
#[no_mangle]
pub extern "C" fn ddog_trace_exporter_cancel_token_new() -> Handle<TokioCancellationToken> {
Handle::from(TokioCancellationToken::new())
}

/// Cancel a cancellation token.
///
/// All clones of the same token observe the cancellation. If the token is
Copy link
Copy Markdown
Member

@marcotc marcotc May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the token was not passed to ddog_trace_exporter_send_trace_chunks, but will soon, is a cancellation: scheduled to happen; ddog_trace_exporter_send_trace_chunks won't start because it's already cancelled; nothing?
It's good to document what happens when the token is not held yet.
And also, if the token is no longer held (aka ddog_trace_exporter_send_trace_chunks is done), which I assume means nothing happens on ddog_trace_exporter_cancel_token_cancel.

/// currently passed to [`ddog_trace_exporter_send_trace_chunks`], the
Copy link
Copy Markdown
Member

@marcotc marcotc May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// currently passed to [`ddog_trace_exporter_send_trace_chunks`], the
/// currently help by [`ddog_trace_exporter_send_trace_chunks`], the

Wouldn't "currently held" be more accurate than "currently passed" here, since "ddog_trace_exporter_send_trace_chunks" might have finished its work already, thus not holding the token anymore?

/// in-flight HTTP request will be aborted cooperatively.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does cooperatively mean here?

///
/// # Safety
///
/// * `token` must point to a valid [`Handle`] returned by
/// [`ddog_trace_exporter_cancel_token_new`].
Comment on lines +320 to +323
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// # Safety
///
/// * `token` must point to a valid [`Handle`] returned by
/// [`ddog_trace_exporter_cancel_token_new`].
/// * `token` is the return of [`ddog_trace_exporter_cancel_token_new`].

I don't think we need to say that we should give this function a "valid" input.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same applies to ddog_trace_exporter_cancel_token_drop.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's more of these Safety comment blocks in down this file. All the ones in a similar "type-checked argument must be type-valid" is likely not needed.

#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_cancel_token_cancel(
mut token: *mut Handle<TokioCancellationToken>,
) {
if let Ok(inner) = token.to_inner_mut() {
inner.cancel();
}
}

/// Free a cancellation token handle.
///
/// After this call the pointer is invalid and must not be reused.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is useful additional information "After this call the pointer is invalid and must not be reused.".

Freed resources cannot be used and are likely invalid.

If we want to provide additional context here, the only thing I can think of is: what happens if you free it while ddog_trace_exporter_send_trace_chunks still owns it. But I feel like that's kind of a natural contract in Rust/native: freeing things in use == bad.
Not sure if we need an extra comment here.

///
/// # Safety
///
/// * `token` must point to a valid [`Handle`] returned by
/// [`ddog_trace_exporter_cancel_token_new`], or be null.
#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_cancel_token_drop(
mut token: *mut Handle<TokioCancellationToken>,
) {
drop(token.take());
}

// ---------------------------------------------------------------------------
// Send trace chunks
// ---------------------------------------------------------------------------
Expand All @@ -305,6 +355,12 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span(
/// serializes in the configured output format, and sends to the agent
/// with retry logic.
///
/// When `cancel` is non-null it must point to a live
/// [`Handle<CancellationToken>`] obtained from
/// [`ddog_trace_exporter_cancel_token_new`]. If the token is cancelled
/// while the send is in progress the HTTP request is aborted and an
/// error with code [`ExporterErrorCode::IoError`] is returned.
///
/// On success, if `response_out` is non-null, a heap-allocated
/// [`ExporterResponse`] is written there. The caller owns it and must
/// free it with `ddog_trace_exporter_response_free`.
Expand All @@ -315,11 +371,13 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span(
/// * `chunks` is consumed and must not be used after this call.
/// * If `response_out` is non-null it must point to valid writable memory for a
/// `Box<ExporterResponse>`.
/// * If `cancel` is non-null it must point to a valid cancellation token handle.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to those in ddog_trace_exporter_cancel_token_cancel: is it even possible to pass an invalid cancellation token since the argument is typed *mut Handle<TokioCancellationToken>?

#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_send_trace_chunks(
exporter: Option<&TraceExporter>,
chunks: Option<Box<TracerTraceChunks>>,
response_out: Option<NonNull<Box<ExporterResponse>>>,
mut cancel: *mut Handle<TokioCancellationToken>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to not use the Handle and use an Option<&TokioCancellationToken> instead. This way the null pointer is treated as None without any conversion.

) -> Option<Box<ExporterError>> {
let Some(exporter) = exporter else {
return gen_error!(ErrorCode::InvalidArgument);
Expand All @@ -328,15 +386,132 @@ pub unsafe extern "C" fn ddog_trace_exporter_send_trace_chunks(
return gen_error!(ErrorCode::InvalidArgument);
};

// Clone the cancellation token (if provided) so the caller retains
// ownership of the handle while we use a cheap clone inside select!.
let cancel_token: Option<TokioCancellationToken> = if cancel.is_null() {
None
} else {
cancel.to_inner_mut().ok().map(|t| t.clone())
};

catch_panic!(
match exporter.send_trace_chunks(chunks.0) {
Ok(resp) => {
if let Some(out) = response_out {
out.as_ptr().write(Box::new(ExporterResponse::from(resp)));
{
let result = if let Some(ct) = cancel_token {
// Use select! so we can abort the in-flight request when
// the caller cancels.
let block_result = exporter.shared_runtime().block_on(async {
tokio::select! {
res = exporter.send_trace_chunks_async(chunks.0) => res,
_ = ct.cancelled() => Err(
std::io::Error::new(
std::io::ErrorKind::Interrupted,
"send cancelled via cancellation token",
).into()
),
}
});
Comment on lines +402 to +412
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be called out in the function's doc that using the cancel token may result in data loss

match block_result {
Ok(inner) => inner,
Err(io_err) => Err(io_err.into()),
}
} else {
exporter.send_trace_chunks(chunks.0)
};

match result {
Ok(resp) => {
if let Some(out) = response_out {
Comment on lines +402 to +423
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cancel token is a nice add-on and should be added to the actual trace exporter, we try to keep logic in the *-ffi crates to a minimum.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the send_trace_chunks_async has a bug and shouldn't be used currently (it should have been marked as experimental). I can get it fixed, but the simplest way to bypass it is to add support for the cancel token to send_trace_chunks

out.as_ptr().write(Box::new(ExporterResponse::from(resp)));
}
None
}
None
Err(e) => Some(Box::new(ExporterError::from(e))),
}
Err(e) => Some(Box::new(ExporterError::from(e))),
},
gen_error!(ErrorCode::Panic)
)
}

// ---------------------------------------------------------------------------
// Fork safety hooks
// ---------------------------------------------------------------------------

/// Must be called in the parent process before `fork()`.
///
/// Pauses all workers on the exporter's [`SharedRuntime`] so that no
/// background threads are running during the fork.
///
/// # Safety
///
/// * `exporter` must be a valid `TraceExporter` pointer or null.
#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_before_fork(
exporter: Option<&TraceExporter>,
) -> Option<Box<ExporterError>> {
let Some(exporter) = exporter else {
return gen_error!(ErrorCode::InvalidArgument);
};

catch_panic!(
{
exporter.shared_runtime().before_fork();
None
},
gen_error!(ErrorCode::Panic)
)
}

/// Must be called in the parent process after `fork()`.
///
/// Restarts workers that were paused by
/// [`ddog_trace_exporter_before_fork`].
///
/// # Safety
///
/// * `exporter` must be a valid `TraceExporter` pointer or null.
#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_after_fork_in_parent(
exporter: Option<&TraceExporter>,
) -> Option<Box<ExporterError>> {
let Some(exporter) = exporter else {
return gen_error!(ErrorCode::InvalidArgument);
};

catch_panic!(
match exporter.shared_runtime().after_fork_parent() {
Ok(()) => None,
Err(e) => Some(Box::new(ExporterError::new(
ErrorCode::Internal,
&e.to_string(),
))),
},
gen_error!(ErrorCode::Panic)
)
}

/// Must be called in the child process after `fork()`.
///
/// Creates a fresh tokio runtime and restarts all workers on the
/// exporter's [`SharedRuntime`].
///
/// # Safety
///
/// * `exporter` must be a valid `TraceExporter` pointer or null.
#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_after_fork_in_child(
exporter: Option<&TraceExporter>,
) -> Option<Box<ExporterError>> {
let Some(exporter) = exporter else {
return gen_error!(ErrorCode::InvalidArgument);
};

catch_panic!(
match exporter.shared_runtime().after_fork_child() {
Ok(()) => None,
Err(e) => Some(Box::new(ExporterError::new(
ErrorCode::Internal,
&e.to_string(),
))),
},
gen_error!(ErrorCode::Panic)
)
Expand Down Expand Up @@ -651,7 +826,12 @@ mod tests {
fn send_trace_chunks_null_exporter_returns_error() {
unsafe {
let chunks = make_chunks(0);
let err = ddog_trace_exporter_send_trace_chunks(None, Some(chunks), None);
let err = ddog_trace_exporter_send_trace_chunks(
None,
Some(chunks),
None,
std::ptr::null_mut(),
);
assert!(err.is_some());
assert_eq!(err.as_ref().unwrap().code, ErrorCode::InvalidArgument);
ddog_trace_exporter_error_free(err);
Expand Down Expand Up @@ -687,4 +867,73 @@ mod tests {
ddog_tracer_trace_chunks_free(chunks);
}
}

// -- Cancellation token -------------------------------------------------

#[test]
fn cancel_token_new_and_drop() {
unsafe {
let mut token = ddog_trace_exporter_cancel_token_new();
let ptr: *mut Handle<TokioCancellationToken> = &mut token;
ddog_trace_exporter_cancel_token_drop(ptr);
}
}

#[test]
fn cancel_token_cancel() {
unsafe {
let mut token = ddog_trace_exporter_cancel_token_new();
let ptr: *mut Handle<TokioCancellationToken> = &mut token;
ddog_trace_exporter_cancel_token_cancel(ptr);
ddog_trace_exporter_cancel_token_drop(ptr);
}
}

#[test]
fn send_trace_chunks_null_cancel_is_accepted() {
// Passing a null cancel pointer should behave like the old
// signature (no cancellation).
unsafe {
let chunks = make_chunks(0);
let err = ddog_trace_exporter_send_trace_chunks(
None,
Some(chunks),
None,
std::ptr::null_mut(),
);
// exporter is None, so we get InvalidArgument, but no crash
// from the null cancel pointer.
assert!(err.is_some());
ddog_trace_exporter_error_free(err);
}
}

// -- Fork safety hooks --------------------------------------------------

#[test]
fn before_fork_null_returns_error() {
unsafe {
let err = ddog_trace_exporter_before_fork(None);
assert!(err.is_some());
ddog_trace_exporter_error_free(err);
}
}

#[test]
fn after_fork_in_parent_null_returns_error() {
unsafe {
let err = ddog_trace_exporter_after_fork_in_parent(None);
assert!(err.is_some());
ddog_trace_exporter_error_free(err);
}
}

#[test]
fn after_fork_in_child_null_returns_error() {
unsafe {
let err = ddog_trace_exporter_after_fork_in_child(None);
assert!(err.is_some());
ddog_trace_exporter_error_free(err);
}
}
}
5 changes: 5 additions & 0 deletions libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
TraceExporterBuilder::default()
}

/// Returns a reference to the underlying [`SharedRuntime`].
pub fn shared_runtime(&self) -> &SharedRuntime {
&self.shared_runtime
}

Comment on lines +212 to +216
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SharedRuntime reference in the TraceExporter should remain private if you need to interact with the SharedRuntime (e.g. for fork hooks) you should create a SharedRuntime with libdd-shared-runtime-ffi and pass it to the TraceExporter builder

/// Stop the background workers owned by this exporter.
///
/// Only the workers spawned for this exporter are stopped. Workers from other components
Expand Down
Loading