-
Notifications
You must be signed in to change notification settings - Fork 20
Add fork safety hooks and cancellation token for trace exporter FFI #2051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ exclude = ["TraceExporter", "TracerSpan", "TracerTraceChunks"] | |
| "TracerSpan" = "ddog_TracerSpan" | ||
| "TracerSpanFields" = "ddog_TracerSpanFields" | ||
| "TracerTraceChunks" = "ddog_TracerTraceChunks" | ||
| "Handle_TokioCancellationToken" = "ddog_TraceExporterCancelToken" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
With only the handle rename here, cbindgen emits Useful? React with 👍 / 👎. |
||
|
|
||
| [export.mangle] | ||
| rename_types = "PascalCase" | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -13,12 +13,15 @@ use crate::error::{ExporterError, ExporterErrorCode as ErrorCode}; | |||||||||||
| use crate::response::ExporterResponse; | ||||||||||||
| use crate::trace_exporter::TraceExporter; | ||||||||||||
| use crate::{catch_panic, gen_error}; | ||||||||||||
| use libdd_common_ffi::handle::{Handle, ToInner}; | ||||||||||||
| use libdd_common_ffi::slice::AsBytes; | ||||||||||||
| use libdd_common_ffi::CharSlice; | ||||||||||||
| use libdd_tinybytes::BytesString; | ||||||||||||
| use libdd_trace_utils::span::v04::SpanBytes; | ||||||||||||
| use std::ptr::NonNull; | ||||||||||||
|
|
||||||||||||
| type TokioCancellationToken = tokio_util::sync::CancellationToken; | ||||||||||||
|
|
||||||||||||
| // --------------------------------------------------------------------------- | ||||||||||||
| // Helper | ||||||||||||
| // --------------------------------------------------------------------------- | ||||||||||||
|
|
@@ -295,6 +298,53 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span( | |||||||||||
| ) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // --------------------------------------------------------------------------- | ||||||||||||
| // Cancellation token | ||||||||||||
| // --------------------------------------------------------------------------- | ||||||||||||
|
|
||||||||||||
| /// Create a new cancellation token. | ||||||||||||
| /// | ||||||||||||
| /// The returned handle must be freed with | ||||||||||||
| /// [`ddog_trace_exporter_cancel_token_drop`]. | ||||||||||||
| #[no_mangle] | ||||||||||||
| pub extern "C" fn ddog_trace_exporter_cancel_token_new() -> Handle<TokioCancellationToken> { | ||||||||||||
| Handle::from(TokioCancellationToken::new()) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /// Cancel a cancellation token. | ||||||||||||
| /// | ||||||||||||
| /// All clones of the same token observe the cancellation. If the token is | ||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if the token was not passed to |
||||||||||||
| /// currently passed to [`ddog_trace_exporter_send_trace_chunks`], the | ||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Wouldn't "currently held" be more accurate than "currently passed" here, since "ddog_trace_exporter_send_trace_chunks" might have finished its work already, thus not holding the token anymore? |
||||||||||||
| /// in-flight HTTP request will be aborted cooperatively. | ||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does |
||||||||||||
| /// | ||||||||||||
| /// # Safety | ||||||||||||
| /// | ||||||||||||
| /// * `token` must point to a valid [`Handle`] returned by | ||||||||||||
| /// [`ddog_trace_exporter_cancel_token_new`]. | ||||||||||||
|
Comment on lines
+320
to
+323
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I don't think we need to say that we should give this function a "valid" input.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same applies to
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's more of these |
||||||||||||
| #[no_mangle] | ||||||||||||
| pub unsafe extern "C" fn ddog_trace_exporter_cancel_token_cancel( | ||||||||||||
| mut token: *mut Handle<TokioCancellationToken>, | ||||||||||||
| ) { | ||||||||||||
| if let Ok(inner) = token.to_inner_mut() { | ||||||||||||
| inner.cancel(); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /// Free a cancellation token handle. | ||||||||||||
| /// | ||||||||||||
| /// After this call the pointer is invalid and must not be reused. | ||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if this is useful additional information "After this call the pointer is invalid and must not be reused.". Freed resources cannot be used and are likely invalid. If we want to provide additional context here, the only thing I can think of is: what happens if you free it while |
||||||||||||
| /// | ||||||||||||
| /// # Safety | ||||||||||||
| /// | ||||||||||||
| /// * `token` must point to a valid [`Handle`] returned by | ||||||||||||
| /// [`ddog_trace_exporter_cancel_token_new`], or be null. | ||||||||||||
| #[no_mangle] | ||||||||||||
| pub unsafe extern "C" fn ddog_trace_exporter_cancel_token_drop( | ||||||||||||
| mut token: *mut Handle<TokioCancellationToken>, | ||||||||||||
| ) { | ||||||||||||
| drop(token.take()); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // --------------------------------------------------------------------------- | ||||||||||||
| // Send trace chunks | ||||||||||||
| // --------------------------------------------------------------------------- | ||||||||||||
|
|
@@ -305,6 +355,12 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span( | |||||||||||
| /// serializes in the configured output format, and sends to the agent | ||||||||||||
| /// with retry logic. | ||||||||||||
| /// | ||||||||||||
| /// When `cancel` is non-null it must point to a live | ||||||||||||
| /// [`Handle<CancellationToken>`] obtained from | ||||||||||||
| /// [`ddog_trace_exporter_cancel_token_new`]. If the token is cancelled | ||||||||||||
| /// while the send is in progress the HTTP request is aborted and an | ||||||||||||
| /// error with code [`ExporterErrorCode::IoError`] is returned. | ||||||||||||
| /// | ||||||||||||
| /// On success, if `response_out` is non-null, a heap-allocated | ||||||||||||
| /// [`ExporterResponse`] is written there. The caller owns it and must | ||||||||||||
| /// free it with `ddog_trace_exporter_response_free`. | ||||||||||||
|
|
@@ -315,11 +371,13 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span( | |||||||||||
| /// * `chunks` is consumed and must not be used after this call. | ||||||||||||
| /// * If `response_out` is non-null it must point to valid writable memory for a | ||||||||||||
| /// `Box<ExporterResponse>`. | ||||||||||||
| /// * If `cancel` is non-null it must point to a valid cancellation token handle. | ||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar comment to those in |
||||||||||||
| #[no_mangle] | ||||||||||||
| pub unsafe extern "C" fn ddog_trace_exporter_send_trace_chunks( | ||||||||||||
| exporter: Option<&TraceExporter>, | ||||||||||||
| chunks: Option<Box<TracerTraceChunks>>, | ||||||||||||
| response_out: Option<NonNull<Box<ExporterResponse>>>, | ||||||||||||
| mut cancel: *mut Handle<TokioCancellationToken>, | ||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's better to not use the Handle and use an |
||||||||||||
| ) -> Option<Box<ExporterError>> { | ||||||||||||
| let Some(exporter) = exporter else { | ||||||||||||
| return gen_error!(ErrorCode::InvalidArgument); | ||||||||||||
|
|
@@ -328,15 +386,132 @@ pub unsafe extern "C" fn ddog_trace_exporter_send_trace_chunks( | |||||||||||
| return gen_error!(ErrorCode::InvalidArgument); | ||||||||||||
| }; | ||||||||||||
|
|
||||||||||||
| // Clone the cancellation token (if provided) so the caller retains | ||||||||||||
| // ownership of the handle while we use a cheap clone inside select!. | ||||||||||||
| let cancel_token: Option<TokioCancellationToken> = if cancel.is_null() { | ||||||||||||
| None | ||||||||||||
| } else { | ||||||||||||
| cancel.to_inner_mut().ok().map(|t| t.clone()) | ||||||||||||
| }; | ||||||||||||
|
|
||||||||||||
| catch_panic!( | ||||||||||||
| match exporter.send_trace_chunks(chunks.0) { | ||||||||||||
| Ok(resp) => { | ||||||||||||
| if let Some(out) = response_out { | ||||||||||||
| out.as_ptr().write(Box::new(ExporterResponse::from(resp))); | ||||||||||||
| { | ||||||||||||
| let result = if let Some(ct) = cancel_token { | ||||||||||||
| // Use select! so we can abort the in-flight request when | ||||||||||||
| // the caller cancels. | ||||||||||||
| let block_result = exporter.shared_runtime().block_on(async { | ||||||||||||
| tokio::select! { | ||||||||||||
| res = exporter.send_trace_chunks_async(chunks.0) => res, | ||||||||||||
| _ = ct.cancelled() => Err( | ||||||||||||
| std::io::Error::new( | ||||||||||||
| std::io::ErrorKind::Interrupted, | ||||||||||||
| "send cancelled via cancellation token", | ||||||||||||
| ).into() | ||||||||||||
| ), | ||||||||||||
| } | ||||||||||||
| }); | ||||||||||||
|
Comment on lines
+402
to
+412
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be called out in the function's doc that using the cancel token may result in data loss |
||||||||||||
| match block_result { | ||||||||||||
| Ok(inner) => inner, | ||||||||||||
| Err(io_err) => Err(io_err.into()), | ||||||||||||
| } | ||||||||||||
| } else { | ||||||||||||
| exporter.send_trace_chunks(chunks.0) | ||||||||||||
| }; | ||||||||||||
|
|
||||||||||||
| match result { | ||||||||||||
| Ok(resp) => { | ||||||||||||
| if let Some(out) = response_out { | ||||||||||||
|
Comment on lines
+402
to
+423
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The cancel token is a nice add-on and should be added to the actual trace exporter, we try to keep logic in the *-ffi crates to a minimum.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also the |
||||||||||||
| out.as_ptr().write(Box::new(ExporterResponse::from(resp))); | ||||||||||||
| } | ||||||||||||
| None | ||||||||||||
| } | ||||||||||||
| None | ||||||||||||
| Err(e) => Some(Box::new(ExporterError::from(e))), | ||||||||||||
| } | ||||||||||||
| Err(e) => Some(Box::new(ExporterError::from(e))), | ||||||||||||
| }, | ||||||||||||
| gen_error!(ErrorCode::Panic) | ||||||||||||
| ) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // --------------------------------------------------------------------------- | ||||||||||||
| // Fork safety hooks | ||||||||||||
| // --------------------------------------------------------------------------- | ||||||||||||
|
|
||||||||||||
| /// Must be called in the parent process before `fork()`. | ||||||||||||
| /// | ||||||||||||
| /// Pauses all workers on the exporter's [`SharedRuntime`] so that no | ||||||||||||
| /// background threads are running during the fork. | ||||||||||||
| /// | ||||||||||||
| /// # Safety | ||||||||||||
| /// | ||||||||||||
| /// * `exporter` must be a valid `TraceExporter` pointer or null. | ||||||||||||
| #[no_mangle] | ||||||||||||
| pub unsafe extern "C" fn ddog_trace_exporter_before_fork( | ||||||||||||
| exporter: Option<&TraceExporter>, | ||||||||||||
| ) -> Option<Box<ExporterError>> { | ||||||||||||
| let Some(exporter) = exporter else { | ||||||||||||
| return gen_error!(ErrorCode::InvalidArgument); | ||||||||||||
| }; | ||||||||||||
|
|
||||||||||||
| catch_panic!( | ||||||||||||
| { | ||||||||||||
| exporter.shared_runtime().before_fork(); | ||||||||||||
| None | ||||||||||||
| }, | ||||||||||||
| gen_error!(ErrorCode::Panic) | ||||||||||||
| ) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /// Must be called in the parent process after `fork()`. | ||||||||||||
| /// | ||||||||||||
| /// Restarts workers that were paused by | ||||||||||||
| /// [`ddog_trace_exporter_before_fork`]. | ||||||||||||
| /// | ||||||||||||
| /// # Safety | ||||||||||||
| /// | ||||||||||||
| /// * `exporter` must be a valid `TraceExporter` pointer or null. | ||||||||||||
| #[no_mangle] | ||||||||||||
| pub unsafe extern "C" fn ddog_trace_exporter_after_fork_in_parent( | ||||||||||||
| exporter: Option<&TraceExporter>, | ||||||||||||
| ) -> Option<Box<ExporterError>> { | ||||||||||||
| let Some(exporter) = exporter else { | ||||||||||||
| return gen_error!(ErrorCode::InvalidArgument); | ||||||||||||
| }; | ||||||||||||
|
|
||||||||||||
| catch_panic!( | ||||||||||||
| match exporter.shared_runtime().after_fork_parent() { | ||||||||||||
| Ok(()) => None, | ||||||||||||
| Err(e) => Some(Box::new(ExporterError::new( | ||||||||||||
| ErrorCode::Internal, | ||||||||||||
| &e.to_string(), | ||||||||||||
| ))), | ||||||||||||
| }, | ||||||||||||
| gen_error!(ErrorCode::Panic) | ||||||||||||
| ) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /// Must be called in the child process after `fork()`. | ||||||||||||
| /// | ||||||||||||
| /// Creates a fresh tokio runtime and restarts all workers on the | ||||||||||||
| /// exporter's [`SharedRuntime`]. | ||||||||||||
| /// | ||||||||||||
| /// # Safety | ||||||||||||
| /// | ||||||||||||
| /// * `exporter` must be a valid `TraceExporter` pointer or null. | ||||||||||||
| #[no_mangle] | ||||||||||||
| pub unsafe extern "C" fn ddog_trace_exporter_after_fork_in_child( | ||||||||||||
| exporter: Option<&TraceExporter>, | ||||||||||||
| ) -> Option<Box<ExporterError>> { | ||||||||||||
| let Some(exporter) = exporter else { | ||||||||||||
| return gen_error!(ErrorCode::InvalidArgument); | ||||||||||||
| }; | ||||||||||||
|
|
||||||||||||
| catch_panic!( | ||||||||||||
| match exporter.shared_runtime().after_fork_child() { | ||||||||||||
| Ok(()) => None, | ||||||||||||
| Err(e) => Some(Box::new(ExporterError::new( | ||||||||||||
| ErrorCode::Internal, | ||||||||||||
| &e.to_string(), | ||||||||||||
| ))), | ||||||||||||
| }, | ||||||||||||
| gen_error!(ErrorCode::Panic) | ||||||||||||
| ) | ||||||||||||
|
|
@@ -651,7 +826,12 @@ mod tests { | |||||||||||
| fn send_trace_chunks_null_exporter_returns_error() { | ||||||||||||
| unsafe { | ||||||||||||
| let chunks = make_chunks(0); | ||||||||||||
| let err = ddog_trace_exporter_send_trace_chunks(None, Some(chunks), None); | ||||||||||||
| let err = ddog_trace_exporter_send_trace_chunks( | ||||||||||||
| None, | ||||||||||||
| Some(chunks), | ||||||||||||
| None, | ||||||||||||
| std::ptr::null_mut(), | ||||||||||||
| ); | ||||||||||||
| assert!(err.is_some()); | ||||||||||||
| assert_eq!(err.as_ref().unwrap().code, ErrorCode::InvalidArgument); | ||||||||||||
| ddog_trace_exporter_error_free(err); | ||||||||||||
|
|
@@ -687,4 +867,73 @@ mod tests { | |||||||||||
| ddog_tracer_trace_chunks_free(chunks); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // -- Cancellation token ------------------------------------------------- | ||||||||||||
|
|
||||||||||||
| #[test] | ||||||||||||
| fn cancel_token_new_and_drop() { | ||||||||||||
| unsafe { | ||||||||||||
| let mut token = ddog_trace_exporter_cancel_token_new(); | ||||||||||||
| let ptr: *mut Handle<TokioCancellationToken> = &mut token; | ||||||||||||
| ddog_trace_exporter_cancel_token_drop(ptr); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| #[test] | ||||||||||||
| fn cancel_token_cancel() { | ||||||||||||
| unsafe { | ||||||||||||
| let mut token = ddog_trace_exporter_cancel_token_new(); | ||||||||||||
| let ptr: *mut Handle<TokioCancellationToken> = &mut token; | ||||||||||||
| ddog_trace_exporter_cancel_token_cancel(ptr); | ||||||||||||
| ddog_trace_exporter_cancel_token_drop(ptr); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| #[test] | ||||||||||||
| fn send_trace_chunks_null_cancel_is_accepted() { | ||||||||||||
| // Passing a null cancel pointer should behave like the old | ||||||||||||
| // signature (no cancellation). | ||||||||||||
| unsafe { | ||||||||||||
| let chunks = make_chunks(0); | ||||||||||||
| let err = ddog_trace_exporter_send_trace_chunks( | ||||||||||||
| None, | ||||||||||||
| Some(chunks), | ||||||||||||
| None, | ||||||||||||
| std::ptr::null_mut(), | ||||||||||||
| ); | ||||||||||||
| // exporter is None, so we get InvalidArgument, but no crash | ||||||||||||
| // from the null cancel pointer. | ||||||||||||
| assert!(err.is_some()); | ||||||||||||
| ddog_trace_exporter_error_free(err); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| // -- Fork safety hooks -------------------------------------------------- | ||||||||||||
|
|
||||||||||||
| #[test] | ||||||||||||
| fn before_fork_null_returns_error() { | ||||||||||||
| unsafe { | ||||||||||||
| let err = ddog_trace_exporter_before_fork(None); | ||||||||||||
| assert!(err.is_some()); | ||||||||||||
| ddog_trace_exporter_error_free(err); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| #[test] | ||||||||||||
| fn after_fork_in_parent_null_returns_error() { | ||||||||||||
| unsafe { | ||||||||||||
| let err = ddog_trace_exporter_after_fork_in_parent(None); | ||||||||||||
| assert!(err.is_some()); | ||||||||||||
| ddog_trace_exporter_error_free(err); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| #[test] | ||||||||||||
| fn after_fork_in_child_null_returns_error() { | ||||||||||||
| unsafe { | ||||||||||||
| let err = ddog_trace_exporter_after_fork_in_child(None); | ||||||||||||
| assert!(err.is_some()); | ||||||||||||
| ddog_trace_exporter_error_free(err); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -209,6 +209,11 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra | |
| TraceExporterBuilder::default() | ||
| } | ||
|
|
||
| /// Returns a reference to the underlying [`SharedRuntime`]. | ||
| pub fn shared_runtime(&self) -> &SharedRuntime { | ||
| &self.shared_runtime | ||
| } | ||
|
|
||
|
Comment on lines
+212
to
+216
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The SharedRuntime reference in the TraceExporter should remain private if you need to interact with the SharedRuntime (e.g. for fork hooks) you should create a SharedRuntime with |
||
| /// Stop the background workers owned by this exporter. | ||
| /// | ||
| /// Only the workers spawned for this exporter are stopped. Workers from other components | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding
tokio/tokio-utilto this crate without committing the corresponding rootCargo.lockupdate breaks locked builds: I checkedcargo check -p libdd-data-pipeline-ffi --locked, and Cargo exits withthe lock file /workspace/libdatadog/Cargo.lock needs to be updated. Any CI or consumer build using the repository lockfile will fail before compiling this crate until the lock entry forlibdd-data-pipeline-ffiincludes these new dependencies.Useful? React with 👍 / 👎.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
edit: I think I was wrong, looks like a legit failure: https://github.com/DataDog/libdatadog/actions/runs/26578731968/job/78305509370?pr=2051
original comment:
This feels like a case of "AI doesn't know how "libdd-data-pipeline-ffi" is used.
This might information be worth writing down (maybe in https://github.com/DataDog/libdatadog/blob/22e92370e5d66c0efd6ac5fe949bd8b6514fd902/libdd-data-pipeline-ffi/README.md, or other place if this is common across multiple
libdatadog/*crates).