Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2674769
feat(sidecar): forward FFE exposures to agent EVP proxy
leoromanovsky Apr 23, 2026
1d8e284
chore(sidecar): keep FFE flusher comments ASCII
leoromanovsky May 22, 2026
3ac1d75
fix(sidecar): use current HTTP capability API for FFE
leoromanovsky May 22, 2026
6ac16b3
fix(sidecar): avoid test-only HTTP client import warning
leoromanovsky May 22, 2026
a1afaae
chore(sidecar): add FFE flusher codeowners
leoromanovsky May 22, 2026
a649b7f
test(ffe): skip EVP mock server tests under miri
leoromanovsky May 22, 2026
2976233
feat(sidecar): forward FFE evaluation metrics to OTLP HTTP intake
leoromanovsky May 23, 2026
875ec8f
fix(sidecar): dispatch FFE actions before application-entry check
leoromanovsky May 23, 2026
e7beb1c
chore(sidecar): rename ffe_flusher → ffe_exposures_flusher
leoromanovsky May 23, 2026
6650cc2
chore(sidecar): rustfmt after ffe_flusher → ffe_exposures_flusher rename
leoromanovsky May 23, 2026
2389cab
docs(sidecar): add FFE forwarder system diagram for PR 2026
leoromanovsky May 23, 2026
97c57e5
test(sidecar): cover FFE dispatch before app registration
leoromanovsky May 24, 2026
a16e7cc
chore(ffe): remove generated sidecar docs
leoromanovsky May 24, 2026
aa12a2d
fix(sidecar): reuse FFE HTTP client and enforce timeouts
leoromanovsky May 27, 2026
11916e3
fix(sidecar): satisfy FFE flusher clippy
leoromanovsky May 27, 2026
717cf7b
fix(sidecar): address FFE forwarding review
leoromanovsky May 27, 2026
45174a1
feat(sidecar): handle structured FFE telemetry
leoromanovsky May 27, 2026
c8577c0
Merge remote-tracking branch 'origin/main' into leo.romanovsky/ffe-si…
leoromanovsky May 28, 2026
a7726a9
fix(profiling): avoid newer integer helper in profiler
leoromanovsky May 28, 2026
f8a8ea9
fix(profiling): allow modulo compatibility lint
leoromanovsky May 28, 2026
6ed96c2
fix(obfuscation): avoid const String bytes for older toolchains
leoromanovsky May 28, 2026
f7a608a
chore(sidecar): keep FFE exposure PR focused
leoromanovsky May 28, 2026
6d23848
Merge remote-tracking branch 'origin/main' into leo.romanovsky/ffe-si…
leoromanovsky May 28, 2026
635d344
Move FFE exposure logic into datadog-ffe
leoromanovsky May 28, 2026
8be471f
Rename FFE exposure feature gate
leoromanovsky May 28, 2026
a570bb9
fix(sidecar-ffi): validate FFE exposure slice first
leoromanovsky May 29, 2026
f580e2d
fix(sidecar-ffi): catch FFE exposure FFI panics
leoromanovsky May 29, 2026
f4c72c6
docs(ffe): explain exposure cache limit
leoromanovsky May 29, 2026
e42dc5c
fix(ffe): log invalid exposure attributes
leoromanovsky May 29, 2026
d780e49
docs(ffe): document empty exposure subjects
leoromanovsky May 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,4 @@ tools/sidecar_mockgen/ @DataDog/libdatadog-php
libdd-data-pipeline/src/otlp/ @DataDog/apm-sdk-capabilities-rust
libdd-data-pipeline/tests/test_trace_exporter_otlp_export.rs @DataDog/apm-sdk-capabilities-rust
libdd-trace-utils/src/otlp_encoder/ @DataDog/apm-sdk-capabilities-rust
datadog-sidecar/src/service/ffe_exposures_flusher.rs @DataDog/libdatadog-php @DataDog/libdatadog-apm @DataDog/feature-flagging-and-experimentation-sdk
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datadog-ffe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ serde-bool = { version = "0.1.3", default-features = false }
serde_with = { version = "3.11.0", default-features = false, features = ["base64", "hex", "macros"] }
thiserror = { version = "2.0.3", default-features = false }
url = { version = "2.5.0", default-features = false, features = ["std"] }
lru = { version = "0.16.3", optional = true }
pyo3 = { version = "0.28", optional = true, default-features = false, features = ["macros"] }

[features]
exposure-events = ["dep:lru"]
pyo3 = ["dep:pyo3"]
2 changes: 2 additions & 0 deletions datadog-ffe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@
mod flag_type;

pub mod rules_based;
#[cfg(feature = "exposure-events")]
pub mod telemetry;

pub use flag_type::{ExpectedFlagType, FlagType};
335 changes: 335 additions & 0 deletions datadog-ffe/src/telemetry/exposures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Reusable FFE exposure payload and deduplication primitives.

use super::FfeTelemetryContext;
use lru::LruCache;
use serde::{Deserialize, Serialize};
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};

// Keep the default aligned with existing server SDK exposure caches: large
// enough for common per-process hot sets, but still bounded in sidecar memory.
const DEFAULT_CACHE_LIMIT: usize = 65_536;
Comment thread
leoromanovsky marked this conversation as resolved.

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct FfeExposureBatch {
pub context: FfeTelemetryContext,
pub exposures: Vec<FfeExposure>,
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct FfeExposure {
pub timestamp_ms: u64,
pub flag_key: String,
/// Empty subject ids are preserved intentionally. SDKs may evaluate with an
/// empty targeting key, and a doLog=true exposure should still be emitted.
pub subject_id: String,
/// JSON object encoded by the tracer. Invalid or non-object JSON is treated
/// as an empty object during EVP payload serialization.
pub subject_attributes_json: String,
pub allocation_key: String,
pub variant: String,
}

#[derive(Clone)]
pub struct ExposureDeduplicator {
cache: Arc<Mutex<LruCache<ExposureCacheKey, ExposureCacheValue>>>,
}

impl Default for ExposureDeduplicator {
fn default() -> Self {
Self::new(DEFAULT_CACHE_LIMIT)
}
}

impl ExposureDeduplicator {
pub fn new(limit: usize) -> Self {
let limit = NonZeroUsize::new(limit).unwrap_or(NonZeroUsize::MIN);
Self {
cache: Arc::new(Mutex::new(LruCache::new(limit))),
}
}

pub fn should_send(&self, context: &FfeTelemetryContext, exposure: &FfeExposure) -> bool {
let key = ExposureCacheKey {
service: context.service.clone(),
env: context.env.clone(),
version: context.version.clone(),
flag_key: exposure.flag_key.clone(),
subject_id: exposure.subject_id.clone(),
};
let value = ExposureCacheValue {
allocation_key: exposure.allocation_key.clone(),
variant: exposure.variant.clone(),
};

let mut cache = self.cache.lock().unwrap_or_else(|e| e.into_inner());
Comment thread
leoromanovsky marked this conversation as resolved.
if cache.get(&key).is_some_and(|cached| cached == &value) {
return false;
}

cache.put(key, value);
true
}
}

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct ExposureCacheKey {
service: String,
env: String,
version: String,
flag_key: String,
subject_id: String,
}

#[derive(Clone, Debug, Eq, PartialEq)]
struct ExposureCacheValue {
allocation_key: String,
variant: String,
}

pub fn encode_exposure_batch(
deduplicator: &ExposureDeduplicator,
batch: FfeExposureBatch,
) -> Result<Option<String>, serde_json::Error> {
let exposures = batch
.exposures
.into_iter()
.filter(is_complete)
.filter(|exposure| deduplicator.should_send(&batch.context, exposure))
.map(ExposureEvent::from)
.collect::<Vec<_>>();

if exposures.is_empty() {
return Ok(None);
}

let payload = ExposurePayload {
context: ExposurePayloadContext::from(batch.context),
exposures,
};
serde_json::to_string(&payload).map(Some)
}

fn is_complete(exposure: &FfeExposure) -> bool {
// `subject_id` is intentionally not required here; empty targeting keys
// are valid evaluation inputs and must not suppress exposure logging.
!exposure.flag_key.is_empty()
&& !exposure.allocation_key.is_empty()
&& !exposure.variant.is_empty()
Comment thread
leoromanovsky marked this conversation as resolved.
}

#[derive(Serialize)]
struct ExposurePayload {
context: ExposurePayloadContext,
exposures: Vec<ExposureEvent>,
}

#[derive(Serialize)]
struct ExposurePayloadContext {
#[serde(skip_serializing_if = "String::is_empty")]
service: String,
#[serde(skip_serializing_if = "String::is_empty")]
env: String,
#[serde(skip_serializing_if = "String::is_empty")]
version: String,
}

impl From<FfeTelemetryContext> for ExposurePayloadContext {
fn from(value: FfeTelemetryContext) -> Self {
Self {
service: value.service,
env: value.env,
version: value.version,
}
}
}

#[derive(Serialize)]
struct ExposureEvent {
timestamp: u64,
allocation: Key,
flag: Key,
variant: Key,
subject: Subject,
}

impl From<FfeExposure> for ExposureEvent {
fn from(value: FfeExposure) -> Self {
Self {
timestamp: value.timestamp_ms,
allocation: Key {
key: value.allocation_key,
},
flag: Key {
key: value.flag_key,
},
variant: Key { key: value.variant },
subject: Subject {
id: value.subject_id,
attributes: subject_attributes(&value.subject_attributes_json),
},
}
}
}

#[derive(Serialize)]
struct Key {
key: String,
}

#[derive(Serialize)]
struct Subject {
id: String,
attributes: serde_json::Map<String, serde_json::Value>,
}

fn subject_attributes(json: &str) -> serde_json::Map<String, serde_json::Value> {
if json.is_empty() {
return serde_json::Map::new();
}

match serde_json::from_str::<serde_json::Value>(json) {
Ok(serde_json::Value::Object(attrs)) => attrs,
Ok(_) => {
log::debug!(
"ffe exposure subject attributes must be a JSON object; using empty attributes"
);
serde_json::Map::new()
}
Err(error) => {
log::debug!(
"invalid ffe exposure subject attributes JSON: {error}; using empty attributes"
);
serde_json::Map::new()
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json::Value;

fn context() -> FfeTelemetryContext {
FfeTelemetryContext {
service: "svc".to_owned(),
env: "prod".to_owned(),
version: "1".to_owned(),
}
}

fn exposure(subject_id: &str, allocation_key: &str, variant: &str) -> FfeExposure {
FfeExposure {
timestamp_ms: 123,
flag_key: "flag".to_owned(),
subject_id: subject_id.to_owned(),
subject_attributes_json: r#"{"tier":"premium"}"#.to_owned(),
allocation_key: allocation_key.to_owned(),
variant: variant.to_owned(),
}
}

#[test]
fn encodes_structured_batch_and_preserves_empty_subject() {
let deduplicator = ExposureDeduplicator::new(4);
let payload = encode_exposure_batch(
&deduplicator,
FfeExposureBatch {
context: context(),
exposures: vec![exposure("", "alloc", "variant")],
},
)
.unwrap()
.unwrap();
let payload: Value = serde_json::from_str(&payload).unwrap();

assert_eq!(payload["context"]["service"], "svc");
assert_eq!(payload["context"]["env"], "prod");
assert_eq!(payload["context"]["version"], "1");
assert_eq!(payload["exposures"][0]["subject"]["id"], "");
assert_eq!(
payload["exposures"][0]["subject"]["attributes"]["tier"],
"premium"
);
}

#[test]
fn deduplicates_same_assignment_and_emits_changed_assignment() {
let deduplicator = ExposureDeduplicator::new(4);
let first = encode_exposure_batch(
&deduplicator,
FfeExposureBatch {
context: context(),
exposures: vec![exposure("user", "alloc-a", "a")],
},
)
.unwrap();
let duplicate = encode_exposure_batch(
&deduplicator,
FfeExposureBatch {
context: context(),
exposures: vec![exposure("user", "alloc-a", "a")],
},
)
.unwrap();
let changed = encode_exposure_batch(
&deduplicator,
FfeExposureBatch {
context: context(),
exposures: vec![exposure("user", "alloc-b", "b")],
},
)
.unwrap();

assert!(first.is_some());
assert!(duplicate.is_none());
assert!(changed.is_some());
}

#[test]
fn cache_key_includes_service_env_and_version() {
let deduplicator = ExposureDeduplicator::new(4);
let first = encode_exposure_batch(
&deduplicator,
FfeExposureBatch {
context: context(),
exposures: vec![exposure("user", "alloc", "variant")],
},
)
.unwrap();
let other_service = encode_exposure_batch(
&deduplicator,
FfeExposureBatch {
context: FfeTelemetryContext {
service: "other".to_owned(),
..context()
},
exposures: vec![exposure("user", "alloc", "variant")],
},
)
.unwrap();

assert!(first.is_some());
assert!(other_service.is_some());
}

#[test]
fn drops_incomplete_exposures() {
let deduplicator = ExposureDeduplicator::new(4);
let mut invalid = exposure("user", "alloc", "variant");
invalid.allocation_key.clear();

assert!(encode_exposure_batch(
&deduplicator,
FfeExposureBatch {
context: context(),
exposures: vec![invalid],
},
)
.unwrap()
.is_none());
}
}
13 changes: 13 additions & 0 deletions datadog-ffe/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

pub mod exposures;

use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct FfeTelemetryContext {
pub service: String,
pub env: String,
pub version: String,
}
Loading
Loading