diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index c9430eadf2..0f3ce79dfb 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -173,6 +173,16 @@ impl TraceExporterBuilder { self } + /// Opt in to the V1 trace protocol. + /// + /// V1 is only used after runtime negotiation with the agent via `/info`. When the agent does + /// not advertise the `/v1.0/traces` endpoint, the exporter falls back to V0.4 transparently. + /// V1 is only compatible with V0.4 input. + pub fn enable_v1_protocol(&mut self) -> &mut Self { + self.output_format = TraceExporterOutputFormat::V1; + self + } + /// Set the header indicating the tracer has computed the top-level tag pub fn set_client_computed_top_level(&mut self) -> &mut Self { self.client_computed_top_level = true; @@ -465,7 +475,9 @@ impl TraceExporterBuilder { }, input_format: self.input_format, output_format: self.output_format, - serializer: TraceSerializer::new(self.output_format), + v1_active: std::sync::atomic::AtomicBool::new(false), + v1_unavailable_logged: std::sync::Once::new(), + serializer: TraceSerializer::new(), client_computed_top_level: self.client_computed_top_level, shared_runtime, dogstatsd, @@ -621,4 +633,54 @@ mod tests { BuilderErrorKind::InvalidUri("empty string".to_string()) ); } + + #[test] + fn test_enable_v1_protocol_sets_output_format() { + let mut builder = TraceExporterBuilder::default(); + builder.enable_v1_protocol(); + assert!(matches!( + builder.output_format, + TraceExporterOutputFormat::V1 + )); + } + + #[test] + fn test_v1_input_v05_incompatible() { + let mut builder = TraceExporterBuilder::default(); + builder + .set_input_format(TraceExporterInputFormat::V05) + .set_output_format(TraceExporterOutputFormat::V1); + let result = builder.build::(); + assert!(matches!( + result, + Err(TraceExporterError::Builder( + BuilderErrorKind::InvalidConfiguration(_) + )) + )); + } + + #[cfg_attr(miri, ignore)] + #[test] + fn test_build_with_v1_starts_inactive() { + let mut builder = TraceExporterBuilder::default(); + builder + .set_input_format(TraceExporterInputFormat::V04) + .enable_v1_protocol(); + let exporter = builder.build::().unwrap(); + + assert!(matches!( + exporter.output_format, + TraceExporterOutputFormat::V1 + )); + assert!(!exporter + .v1_active + .load(std::sync::atomic::Ordering::Relaxed)); + assert_eq!( + exporter + .effective_output_format() + .add_path(&exporter.endpoint.url) + .to_string(), + "http://127.0.0.1:8126/v0.4/traces" + ); + } } diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index fecd2b91ef..a0785bfcfb 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -47,13 +47,17 @@ use libdd_trace_utils::send_with_retry::{ use libdd_trace_utils::span::{v04::Span, TraceData}; use libdd_trace_utils::trace_utils::TracerHeaderTags; use std::io; -use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Once}; use std::time::Duration; use std::{borrow::Borrow, str::FromStr}; use tokio::task::JoinSet; use tracing::{debug, error, warn}; const INFO_ENDPOINT: &str = "/info"; +const V04_TRACES_ENDPOINT: &str = "/v0.4/traces"; +const V05_TRACES_ENDPOINT: &str = "/v0.5/traces"; +const V1_TRACES_ENDPOINT: &str = "/v1.0/traces"; /// Values for optional telemetry HTTP session headers (`dd-session-id`, root/parent). #[derive(Debug, Default, Clone)] @@ -92,9 +96,9 @@ impl TraceExporterOutputFormat { add_path( url, match self { - TraceExporterOutputFormat::V04 => "/v0.4/traces", - TraceExporterOutputFormat::V05 => "/v0.5/traces", - TraceExporterOutputFormat::V1 => "/v1.0/traces", + TraceExporterOutputFormat::V04 => V04_TRACES_ENDPOINT, + TraceExporterOutputFormat::V05 => V05_TRACES_ENDPOINT, + TraceExporterOutputFormat::V1 => V1_TRACES_ENDPOINT, }, ) } @@ -182,6 +186,13 @@ pub struct TraceExporter, /// None if dogstatsd is disabled @@ -347,6 +358,9 @@ impl Tra fn check_agent_info(&self) { if let Some(agent_info) = agent_info::get_agent_info() { if self.has_agent_info_state_changed(&agent_info) { + if matches!(self.output_format, TraceExporterOutputFormat::V1) { + self.refresh_v1_active(&agent_info); + } match &**self.client_side_stats.status.load() { StatsComputationStatus::Disabled => {} StatsComputationStatus::DisabledByAgent { .. } => { @@ -390,6 +404,35 @@ impl Tra // never computed. This is intentionally a no-op. } + /// Reconcile `v1_active` with the agent's currently-advertised endpoints. Called only when + /// V1 is configured and the agent info state has changed, so transitions are logged at most + /// once per change. Note: `v1_active` can also transition `true → false` outside this path, + /// via the fail-closed hook in `send_trace_chunks_inner` when the agent returns 404 on + /// `/v1.0/traces` (the agent does not bump its state hash on 404). + #[cfg(not(target_arch = "wasm32"))] + fn refresh_v1_active(&self, agent_info: &Arc) { + let supports_v1 = agent_info + .info + .endpoints + .as_ref() + .is_some_and(|e| e.iter().any(|p| p == V1_TRACES_ENDPOINT)); + let previous = self.v1_active.swap(supports_v1, Ordering::Relaxed); + match (previous, supports_v1) { + (false, true) => debug!("V1 trace protocol enabled (agent advertises /v1.0/traces)"), + (true, false) => { + warn!("V1 trace protocol no longer advertised by agent; falling back to v0.4") + } + (false, false) => { + self.v1_unavailable_logged.call_once(|| { + warn!( + "V1 trace protocol requested by SDK but agent does not advertise {V1_TRACES_ENDPOINT}; continuing on v0.4" + ); + }); + } + (true, true) => {} + } + } + /// !!! This function is only for testing purposes !!! /// /// Waits the agent info to be ready by checking the agent_info state. @@ -595,11 +638,16 @@ impl Tra return self.send_otlp_traces_inner(traces, config).await; } + // Snapshot the effective format once so the serializer and the URL agree even if + // `v1_active` flips mid-send (the background `/info` fetcher can race us otherwise). + let effective_format = self.effective_output_format(); + let prepared = match self.serializer.prepare_traces_payload( traces, header_tags, &self.metadata, self.agent_payload_response_version.as_ref(), + effective_format, ) { Ok(p) => p, Err(e) => { @@ -613,18 +661,39 @@ impl Tra }; let endpoint = Endpoint { - url: self.get_agent_url(), + url: effective_format.add_path(&self.endpoint.url), ..self.endpoint.clone() }; - self.send_traces_with_telemetry( - &endpoint, - prepared.data, - prepared.headers, - prepared.chunk_count, - dropped_p0_stats.dropped_p0_traces, - ) - .await + let result = self + .send_traces_with_telemetry( + &endpoint, + prepared.data, + prepared.headers, + prepared.chunk_count, + dropped_p0_stats.dropped_p0_traces, + ) + .await; + + // State-hash trap mitigation: the agent does not return a `Datadog-Agent-State` + // header on 404, so without this hook we'd stay pinned to V1 until the next `/info` + // poll (up to the fetcher's refresh interval). On a 404 to `/v1.0/traces`, fail + // closed immediately and force an `/info` refresh so the next send uses V0.4 and + // V1 support is re-detected as soon as the agent advertises it again. + if effective_format == TraceExporterOutputFormat::V1 { + if let Err(TraceExporterError::Request(ref e)) = result { + if e.status() == http::StatusCode::NOT_FOUND + && self.v1_active.swap(false, Ordering::Relaxed) + { + warn!( + "V1 trace send returned 404; agent no longer advertises {V1_TRACES_ENDPOINT} — falling back to V0.4" + ); + self.info_response_observer.manual_trigger(); + } + } + } + + result } /// Handle the result of sending traces to the agent @@ -810,8 +879,21 @@ impl Tra ) } - fn get_agent_url(&self) -> Uri { - self.output_format.add_path(&self.endpoint.url) + /// Return the trace output format that will actually be used to encode and send the next + /// payload. + /// + /// When V1 is configured, the effective format is V1 only after the agent has advertised + /// `/v1.0/traces` via the `/info` endpoint (fail-closed). Until then — and any time the + /// agent rolls back this capability — V1 transparently falls back to V0.4. V0.4 and V0.5 + /// pass through unchanged. + fn effective_output_format(&self) -> TraceExporterOutputFormat { + match self.output_format { + TraceExporterOutputFormat::V1 if self.v1_active.load(Ordering::Relaxed) => { + TraceExporterOutputFormat::V1 + } + TraceExporterOutputFormat::V1 => TraceExporterOutputFormat::V04, + other => other, + } } #[cfg(test)] @@ -901,6 +983,148 @@ mod tests { assert!(headers.contains_key("datadog-client-computed-top-level")); } + #[cfg_attr(miri, ignore)] + #[test] + fn test_effective_output_format_v04_passthrough() { + let exporter = build_test_exporter( + "http://127.0.0.1:8126".to_string(), + None, + TraceExporterInputFormat::V04, + TraceExporterOutputFormat::V04, + false, + false, + ); + assert!(matches!( + exporter.effective_output_format(), + TraceExporterOutputFormat::V04 + )); + } + + #[cfg_attr(miri, ignore)] + #[test] + fn test_effective_output_format_v1_pre_negotiation_falls_back_to_v04() { + let exporter = build_test_exporter( + "http://127.0.0.1:8126".to_string(), + None, + TraceExporterInputFormat::V04, + TraceExporterOutputFormat::V1, + false, + false, + ); + assert!(matches!( + exporter.effective_output_format(), + TraceExporterOutputFormat::V04 + )); + } + + #[cfg_attr(miri, ignore)] + #[test] + fn test_effective_output_format_v1_post_negotiation_uses_v1() { + let exporter = build_test_exporter( + "http://127.0.0.1:8126".to_string(), + None, + TraceExporterInputFormat::V04, + TraceExporterOutputFormat::V1, + false, + false, + ); + exporter + .v1_active + .store(true, std::sync::atomic::Ordering::Relaxed); + assert!(matches!( + exporter.effective_output_format(), + TraceExporterOutputFormat::V1 + )); + assert_eq!( + exporter + .effective_output_format() + .add_path(&exporter.endpoint.url) + .to_string(), + "http://127.0.0.1:8126/v1.0/traces" + ); + } + + #[cfg_attr(miri, ignore)] + #[test] + fn test_refresh_v1_active_enables_when_endpoint_advertised() { + use crate::agent_info::schema::{AgentInfo, AgentInfoStruct}; + let exporter = build_test_exporter( + "http://127.0.0.1:8126".to_string(), + None, + TraceExporterInputFormat::V04, + TraceExporterOutputFormat::V1, + false, + false, + ); + let agent_info = Arc::new(AgentInfo { + state_hash: "hash-1".to_string(), + info: AgentInfoStruct { + endpoints: Some(vec![ + V04_TRACES_ENDPOINT.to_string(), + V1_TRACES_ENDPOINT.to_string(), + ]), + ..Default::default() + }, + }); + exporter.refresh_v1_active(&agent_info); + assert!(exporter + .v1_active + .load(std::sync::atomic::Ordering::Relaxed)); + } + + #[cfg_attr(miri, ignore)] + #[test] + fn test_refresh_v1_active_disables_when_endpoint_disappears() { + use crate::agent_info::schema::{AgentInfo, AgentInfoStruct}; + let exporter = build_test_exporter( + "http://127.0.0.1:8126".to_string(), + None, + TraceExporterInputFormat::V04, + TraceExporterOutputFormat::V1, + false, + false, + ); + exporter + .v1_active + .store(true, std::sync::atomic::Ordering::Relaxed); + let agent_info = Arc::new(AgentInfo { + state_hash: "hash-2".to_string(), + info: AgentInfoStruct { + endpoints: Some(vec![V04_TRACES_ENDPOINT.to_string()]), + ..Default::default() + }, + }); + exporter.refresh_v1_active(&agent_info); + assert!(!exporter + .v1_active + .load(std::sync::atomic::Ordering::Relaxed)); + } + + #[cfg_attr(miri, ignore)] + #[test] + fn test_refresh_v1_active_handles_missing_endpoints_field() { + use crate::agent_info::schema::{AgentInfo, AgentInfoStruct}; + let exporter = build_test_exporter( + "http://127.0.0.1:8126".to_string(), + None, + TraceExporterInputFormat::V04, + TraceExporterOutputFormat::V1, + false, + false, + ); + let agent_info = Arc::new(AgentInfo { + state_hash: "hash-3".to_string(), + info: AgentInfoStruct { + endpoints: None, + ..Default::default() + }, + }); + exporter.refresh_v1_active(&agent_info); + assert!(!exporter + .v1_active + .load(std::sync::atomic::Ordering::Relaxed)); + } + fn read(socket: &net::UdpSocket) -> String { let mut buf = [0; 1_000]; socket.recv(&mut buf).expect("No data"); @@ -960,7 +1184,7 @@ mod tests { }); let exporter = build_test_exporter( - fake_agent.url("/v0.4/traces"), + fake_agent.url(V04_TRACES_ENDPOINT), Some(stats_socket.local_addr().unwrap().to_string()), TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04, @@ -1030,7 +1254,7 @@ mod tests { let fake_agent = MockServer::start(); let exporter = build_test_exporter( - fake_agent.url("/v0.4/traces"), + fake_agent.url(V04_TRACES_ENDPOINT), Some(stats_socket.local_addr().unwrap().to_string()), TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04, @@ -1066,7 +1290,7 @@ mod tests { }); let exporter = build_test_exporter( - fake_agent.url("/v0.4/traces"), + fake_agent.url(V04_TRACES_ENDPOINT), Some(stats_socket.local_addr().unwrap().to_string()), TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04, @@ -1174,7 +1398,7 @@ mod tests { }); let exporter = build_test_exporter( - fake_agent.url("/v0.4/traces"), + fake_agent.url(V04_TRACES_ENDPOINT), Some(stats_socket.local_addr().unwrap().to_string()), TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04, @@ -1278,7 +1502,7 @@ mod tests { }); let exporter = build_test_exporter( - fake_agent.url("/v0.4/traces"), + fake_agent.url(V04_TRACES_ENDPOINT), Some(stats_socket.local_addr().unwrap().to_string()), TraceExporterInputFormat::V04, TraceExporterOutputFormat::V04, @@ -1451,7 +1675,7 @@ mod tests { } }"#; let traces_endpoint = server.mock(|when, then| { - when.method(POST).path("/v0.4/traces"); + when.method(POST).path(V04_TRACES_ENDPOINT); then.status(200) .header("content-type", "application/json") .body(response_body); @@ -1506,7 +1730,7 @@ mod tests { } }"#; let traces_endpoint = server.mock(|when, then| { - when.method(POST).path("/v0.5/traces"); + when.method(POST).path(V05_TRACES_ENDPOINT); then.status(200) .header("content-type", "application/json") .body(response_body); @@ -1556,7 +1780,7 @@ mod tests { } }"#; let traces_endpoint = server.mock(|when, then| { - when.method(POST).path("/v0.5/traces").is_true(|req| { + when.method(POST).path(V05_TRACES_ENDPOINT).is_true(|req| { let bytes = libdd_tinybytes::Bytes::copy_from_slice(req.body_ref()); bytes.to_vec() == V5_EMPTY }); @@ -1619,7 +1843,7 @@ mod tests { } }"#; let traces_endpoint = server.mock(|when, then| { - when.method(POST).path("/v0.4/traces"); + when.method(POST).path(V04_TRACES_ENDPOINT); then.status(200) .header("content-type", "application/json") .header("datadog-rates-payload-version", "abc") @@ -1654,7 +1878,7 @@ mod tests { } }"#; let mut traces_endpoint = server.mock(|when, then| { - when.method(POST).path("/v0.4/traces"); + when.method(POST).path(V04_TRACES_ENDPOINT); then.status(200) .header("content-type", "application/json") .header("datadog-rates-payload-version", "abc") @@ -1681,7 +1905,7 @@ mod tests { traces_endpoint.delete(); let traces_endpoint = server.mock(|when, then| { - when.method(POST).path("/v0.4/traces"); + when.method(POST).path(V04_TRACES_ENDPOINT); then.status(200) .header("content-type", "application/json") .header("datadog-rates-payload-version", "def") @@ -1738,7 +1962,7 @@ mod tests { let mock_traces = server.mock(|when, then| { when.method(POST) .header("Content-type", "application/msgpack") - .path("/v0.4/traces"); + .path(V04_TRACES_ENDPOINT); then.status(200).body( r#"{ "rate_by_service": { @@ -1749,7 +1973,7 @@ mod tests { }); let mock_info = server.mock(|when, then| { - when.method(GET).path("/info"); + when.method(GET).path(INFO_ENDPOINT); then.delay(delay).status(status).body(response); }); @@ -1854,6 +2078,7 @@ mod tests { #[cfg(test)] mod single_threaded_tests { + use super::stats::STATS_ENDPOINT; use super::*; use crate::agent_info; use httpmock::prelude::*; @@ -1872,23 +2097,25 @@ mod single_threaded_tests { let mock_traces = server.mock(|when, then| { when.method(POST) .header("Content-type", "application/msgpack") - .path("/v0.4/traces"); + .path(V04_TRACES_ENDPOINT); then.status(200).body(""); }); let mock_stats = server.mock(|when, then| { when.method(POST) .header("Content-type", "application/msgpack") - .path("/v0.6/stats"); + .path(STATS_ENDPOINT); then.status(200).body(""); }); let _mock_info = server.mock(|when, then| { - when.method(GET).path("/info"); + when.method(GET).path(INFO_ENDPOINT); then.status(200) .header("content-type", "application/json") .header("datadog-agent-state", "1") - .body(r#"{"version":"1","client_drop_p0s":true,"endpoints":["/v0.4/traces","/v0.6/stats"]}"#); + .body(format!( + r#"{{"version":"1","client_drop_p0s":true,"endpoints":["{V04_TRACES_ENDPOINT}","{STATS_ENDPOINT}"]}}"# + )); }); let runtime = Arc::new(SharedRuntime::new().unwrap()); @@ -1960,7 +2187,7 @@ mod single_threaded_tests { let mock_traces = server.mock(|when, then| { when.method(POST) .header("Content-type", "application/msgpack") - .path("/v0.4/traces"); + .path(V04_TRACES_ENDPOINT); then.status(200).body( r#"{ "rate_by_service": { @@ -1974,16 +2201,18 @@ mod single_threaded_tests { let _mock_stats = server.mock(|when, then| { when.method(POST) .header("Content-type", "application/msgpack") - .path("/v0.6/stats"); + .path(STATS_ENDPOINT); then.delay(Duration::from_secs(10)).status(200).body(""); }); let _mock_info = server.mock(|when, then| { - when.method(GET).path("/info"); + when.method(GET).path(INFO_ENDPOINT); then.status(200) .header("content-type", "application/json") .header("datadog-agent-state", "1") - .body(r#"{"version":"1","client_drop_p0s":true,"endpoints":["/v0.4/traces","/v0.6/stats"]}"#); + .body(format!( + r#"{{"version":"1","client_drop_p0s":true,"endpoints":["{V04_TRACES_ENDPOINT}","{STATS_ENDPOINT}"]}}"# + )); }); let runtime = Arc::new(SharedRuntime::new().unwrap()); @@ -2073,25 +2302,27 @@ mod single_threaded_tests { let _mock_traces = server.mock(|when, then| { when.method(POST) .header("Content-type", "application/msgpack") - .path("/v0.4/traces"); + .path(V04_TRACES_ENDPOINT); then.status(200).body(""); }); let _mock_stats = server.mock(|when, then| { when.method(POST) .header("Content-type", "application/msgpack") - .path("/v0.6/stats"); + .path(STATS_ENDPOINT); then.status(200).body(""); }); let info_body = match agent_obfuscation_version { Some(v) => format!( - r#"{{"version":"1","client_drop_p0s":true,"obfuscation_version":{v},"endpoints":["/v0.4/traces","/v0.6/stats"]}}"# + r#"{{"version":"1","client_drop_p0s":true,"obfuscation_version":{v},"endpoints":["{V04_TRACES_ENDPOINT}","{STATS_ENDPOINT}"]}}"# + ), + None => format!( + r#"{{"version":"1","client_drop_p0s":true,"endpoints":["{V04_TRACES_ENDPOINT}","{STATS_ENDPOINT}"]}}"# ), - None => r#"{"version":"1","client_drop_p0s":true,"endpoints":["/v0.4/traces","/v0.6/stats"]}"#.to_string(), }; let _mock_info = server.mock(|when, then| { - when.method(GET).path("/info"); + when.method(GET).path(INFO_ENDPOINT); then.status(200) .header("content-type", "application/json") .header("datadog-agent-state", "1") @@ -2156,4 +2387,91 @@ mod single_threaded_tests { "obfuscation must activate when opted in and agent supports" ); } + + /// Agent rollback / partial-V1 scenario: `/info` advertises `/v1.0/traces` but the actual + /// endpoint returns 404 (e.g. customer rolled back the agent without `/info` reflecting it). + /// The fail-closed hook must flip `v1_active` to false on the first 404 so the next send + /// uses V0.4. + #[cfg_attr(miri, ignore)] + #[test] + fn test_v1_404_fails_closed_to_v04() { + agent_info::clear_cache_for_test(); + + let server = MockServer::start(); + + let mock_v1 = server.mock(|when, then| { + when.method(POST).path(V1_TRACES_ENDPOINT); + then.status(404).body(""); + }); + + let mock_v04 = server.mock(|when, then| { + when.method(POST).path(V04_TRACES_ENDPOINT); + then.status(200).body("{}"); + }); + + let _mock_info = server.mock(|when, then| { + when.method(GET).path(INFO_ENDPOINT); + then.status(200) + .header("content-type", "application/json") + .header("datadog-agent-state", "1") + .body(format!( + r#"{{"version":"1","client_drop_p0s":true,"endpoints":["{V1_TRACES_ENDPOINT}","{V04_TRACES_ENDPOINT}"]}}"# + )); + }); + + let runtime = Arc::new(SharedRuntime::new().unwrap()); + + let mut builder = TraceExporter::::builder(); + builder + .set_url(&server.url("/")) + .set_service("test") + .set_env("staging") + .set_tracer_version("v0.1") + .set_language("nodejs") + .set_language_version("1.0") + .set_language_interpreter("v8") + .set_input_format(TraceExporterInputFormat::V04) + .set_shared_runtime(runtime.clone()) + .enable_v1_protocol(); + let exporter = builder.build::().unwrap(); + + // Wait until /info has been fetched so the next send promotes v1_active=true. + let start = std::time::Instant::now(); + while agent_info::get_agent_info().is_none() { + if start.elapsed() > Duration::from_secs(5) { + panic!("timeout waiting for /info"); + } + std::thread::sleep(Duration::from_millis(50)); + } + + let trace_chunk = vec![SpanBytes { + duration: 10, + ..Default::default() + }]; + let data = msgpack_encoder::v04::to_vec(&[trace_chunk]); + + // 1st send: /info has promoted v1_active=true, so this hits /v1.0/traces and 404s. + let result1 = exporter.send(&data); + assert!(result1.is_err(), "first send should error on 404"); + assert!( + !exporter.v1_active.load(Ordering::Relaxed), + "v1_active must flip to false after a V1 404" + ); + + // 2nd send: effective format is now V0.4 → hits /v0.4/traces and succeeds. + let result2 = exporter.send(&data); + assert!( + result2.is_ok(), + "second send (V0.4 fallback) should succeed: {:?}", + result2.err() + ); + + // The first send retries internally on 4xx (send_with_retry default), so V1 is hit + // multiple times before the fail-closed flip; we only care that it was hit at all. + assert!( + mock_v1.calls() >= 1, + "V1 endpoint must be tried at least once before the fail-closed flip" + ); + mock_v04.assert(); + } } diff --git a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs index 445a7f3bb3..fcb91c8dfc 100644 --- a/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs +++ b/libdd-data-pipeline/src/trace_exporter/trace_serializer.rs @@ -36,15 +36,13 @@ pub(super) struct PreparedTracesPayload { #[derive(Debug)] pub(super) struct TraceSerializer { previous_serialised_len: AtomicUsize, - output_format: TraceExporterOutputFormat, } impl TraceSerializer { /// Create a new trace serializer - pub(super) fn new(output_format: TraceExporterOutputFormat) -> Self { + pub(super) fn new() -> Self { Self { previous_serialised_len: AtomicUsize::new(MIN_BUFFER_CAPACITY), - output_format, } } @@ -55,8 +53,9 @@ impl TraceSerializer { header_tags: TracerHeaderTags, metadata: &TracerMetadata, agent_payload_response_version: Option<&AgentResponsePayloadVersion>, + output_format: TraceExporterOutputFormat, ) -> Result { - let payload = self.collect_and_process_traces(traces)?; + let payload = self.collect_and_process_traces(traces, output_format)?; let chunks = payload.size(); let headers = self.build_traces_headers(header_tags, chunks, agent_payload_response_version); @@ -73,18 +72,18 @@ impl TraceSerializer { fn collect_and_process_traces( &self, traces: Vec>>, + output_format: TraceExporterOutputFormat, ) -> Result, TraceExporterError> { - match self.output_format { + let map_err = |e: anyhow::Error| { + TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) + }; + match output_format { TraceExporterOutputFormat::V1 => Ok(tracer_payload::TraceChunks::V1(traces)), TraceExporterOutputFormat::V04 => { - trace_utils::collect_trace_chunks(traces, TraceEncoding::V04).map_err(|e| { - TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) - }) + trace_utils::collect_trace_chunks(traces, TraceEncoding::V04).map_err(map_err) } TraceExporterOutputFormat::V05 => { - trace_utils::collect_trace_chunks(traces, TraceEncoding::V05).map_err(|e| { - TraceExporterError::Deserialization(DecodeError::InvalidFormat(e.to_string())) - }) + trace_utils::collect_trace_chunks(traces, TraceEncoding::V05).map_err(map_err) } } } @@ -180,25 +179,16 @@ mod tests { #[test] fn test_trace_serializer_new() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); - assert!(matches!( - serializer.output_format, - TraceExporterOutputFormat::V04 - )); - } - - #[test] - fn test_trace_serializer_new_with_agent_version() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05); - assert!(matches!( - serializer.output_format, - TraceExporterOutputFormat::V05 - )); + let serializer = TraceSerializer::new(); + assert_eq!( + serializer.previous_serialised_len.load(Ordering::Relaxed), + MIN_BUFFER_CAPACITY + ); } #[test] fn test_build_traces_headers() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); + let serializer = TraceSerializer::new(); let header_tags = create_test_header_tags(); let headers = serializer.build_traces_headers(header_tags, 3, None); @@ -228,7 +218,7 @@ mod tests { #[test] fn test_build_traces_headers_with_agent_version() { let agent_version = AgentResponsePayloadVersion::new(); - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); + let serializer = TraceSerializer::new(); let header_tags = create_test_header_tags(); let headers = serializer.build_traces_headers(header_tags, 2, Some(&agent_version)); @@ -239,10 +229,10 @@ mod tests { #[test] fn test_collect_and_process_traces_v04() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); + let serializer = TraceSerializer::new(); let traces = vec![vec![create_test_span()]]; - let result = serializer.collect_and_process_traces(traces); + let result = serializer.collect_and_process_traces(traces, TraceExporterOutputFormat::V04); assert!(result.is_ok()); let payload = result.unwrap(); @@ -252,10 +242,10 @@ mod tests { #[test] fn test_collect_and_process_traces_v05() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05); + let serializer = TraceSerializer::new(); let traces = vec![vec![create_test_span()]]; - let result = serializer.collect_and_process_traces(traces); + let result = serializer.collect_and_process_traces(traces, TraceExporterOutputFormat::V05); assert!(result.is_ok()); let payload = result.unwrap(); @@ -265,14 +255,14 @@ mod tests { #[test] fn test_collect_and_process_traces_multiple_chunks() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); + let serializer = TraceSerializer::new(); let traces = vec![ vec![create_test_span()], vec![create_test_span(), create_test_span()], vec![create_test_span()], ]; - let result = serializer.collect_and_process_traces(traces); + let result = serializer.collect_and_process_traces(traces, TraceExporterOutputFormat::V04); assert!(result.is_ok()); let payload = result.unwrap(); @@ -281,10 +271,10 @@ mod tests { #[test] fn test_serialize_payload_v04() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); + let serializer = TraceSerializer::new(); let original_traces = vec![vec![create_test_span()]]; let payload = serializer - .collect_and_process_traces(original_traces.clone()) + .collect_and_process_traces(original_traces.clone(), TraceExporterOutputFormat::V04) .unwrap(); let result = serializer.serialize_payload(&payload, &TracerMetadata::default()); @@ -316,10 +306,10 @@ mod tests { #[test] fn test_serialize_payload_v05() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05); + let serializer = TraceSerializer::new(); let original_traces = vec![vec![create_test_span()]]; let payload = serializer - .collect_and_process_traces(original_traces.clone()) + .collect_and_process_traces(original_traces.clone(), TraceExporterOutputFormat::V05) .unwrap(); let result = serializer.serialize_payload(&payload, &TracerMetadata::default()); @@ -351,7 +341,7 @@ mod tests { #[test] fn test_prepare_traces_payload_v04() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); + let serializer = TraceSerializer::new(); let traces = vec![ vec![create_test_span()], vec![create_test_span(), create_test_span()], @@ -363,6 +353,7 @@ mod tests { header_tags, &TracerMetadata::default(), None, + TraceExporterOutputFormat::V04, ); assert!(result.is_ok()); @@ -378,7 +369,7 @@ mod tests { #[test] fn test_prepare_traces_payload_v05() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V05); + let serializer = TraceSerializer::new(); let traces = vec![vec![create_test_span()]]; let header_tags = create_test_header_tags(); @@ -387,6 +378,7 @@ mod tests { header_tags, &TracerMetadata::default(), None, + TraceExporterOutputFormat::V05, ); assert!(result.is_ok()); @@ -399,7 +391,7 @@ mod tests { #[test] fn test_prepare_traces_payload_with_agent_version() { let agent_version = AgentResponsePayloadVersion::new(); - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); + let serializer = TraceSerializer::new(); let traces = vec![vec![create_test_span()]]; let header_tags = create_test_header_tags(); @@ -408,6 +400,7 @@ mod tests { header_tags, &TracerMetadata::default(), Some(&agent_version), + TraceExporterOutputFormat::V04, ); assert!(result.is_ok()); @@ -418,7 +411,7 @@ mod tests { #[test] fn test_prepare_traces_payload_empty_traces() { - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); + let serializer = TraceSerializer::new(); let traces: Vec> = vec![]; let header_tags = create_test_header_tags(); @@ -427,6 +420,7 @@ mod tests { header_tags, &TracerMetadata::default(), None, + TraceExporterOutputFormat::V04, ); assert!(result.is_ok()); @@ -449,7 +443,7 @@ mod tests { ..Default::default() }; - let serializer = TraceSerializer::new(TraceExporterOutputFormat::V04); + let serializer = TraceSerializer::new(); let headers = serializer.build_traces_headers(header_tags, 1, None); assert_eq!(headers.get("datadog-meta-lang").unwrap(), "python"); diff --git a/libdd-data-pipeline/tests/test_trace_exporter.rs b/libdd-data-pipeline/tests/test_trace_exporter.rs index ed6ba60465..7ff0379e9f 100644 --- a/libdd-data-pipeline/tests/test_trace_exporter.rs +++ b/libdd-data-pipeline/tests/test_trace_exporter.rs @@ -314,6 +314,17 @@ mod tracing_integration_tests { let data = get_v04_to_v1_trace_snapshot_test_payload("test_exporter_v04_v1_snapshot"); + // V1 is gated by /info negotiation (fail-closed). Wait until the background + // fetcher has populated agent_info so the first send promotes v1_active=true + // and the payload is encoded as V1. + let start = std::time::Instant::now(); + while libdd_data_pipeline::agent_info::get_agent_info().is_none() { + if start.elapsed() > std::time::Duration::from_secs(5) { + panic!("timeout waiting for /info"); + } + std::thread::sleep(std::time::Duration::from_millis(50)); + } + let response = trace_exporter.send(data.as_ref()); assert!(response.is_ok(), "send failed: {:?}", response.err()); })