From 8c4eaf8e3acf417d781410c94537f9861019fbea Mon Sep 17 00:00:00 2001 From: Munir Date: Fri, 27 Mar 2026 18:24:55 -0400 Subject: [PATCH 1/6] feat(telemetry): add support for session and root session id --- libdd-telemetry-ffi/src/builder/expanded.rs | 122 ++++++++++++++++++++ libdd-telemetry-ffi/src/builder/macros.rs | 4 +- libdd-telemetry-ffi/src/lib.rs | 37 ++++++ libdd-telemetry/src/worker/http_client.rs | 40 ++++++- libdd-telemetry/src/worker/mod.rs | 93 ++++++++++++++- 5 files changed, 293 insertions(+), 3 deletions(-) diff --git a/libdd-telemetry-ffi/src/builder/expanded.rs b/libdd-telemetry-ffi/src/builder/expanded.rs index 74daf6e8c0..ad601239e9 100644 --- a/libdd-telemetry-ffi/src/builder/expanded.rs +++ b/libdd-telemetry-ffi/src/builder/expanded.rs @@ -245,6 +245,50 @@ mod macros { }; ffi::MaybeError::None } + #[no_mangle] + pub unsafe extern "C" fn ddog_telemetry_builder_with_str_telemetry_session_id( + telemetry_builder: &mut TelemetryWorkerBuilder, + param: ffi::CharSlice, + ) -> ffi::MaybeError { + telemetry_builder.telemetry_session_id = + match (|s: ffi::CharSlice| -> Result<_, String> { + Ok(Some(s.to_utf8_lossy().into_owned())) + })(param) + { + Ok(o) => o, + Err(e) => { + return ffi::MaybeError::Some(libdd_common_ffi::Error::from( + ({ + let res = std::fmt::format(format_args!("{e:?}")); + res + }), + )); + } + }; + ffi::MaybeError::None + } + #[no_mangle] + pub unsafe extern "C" fn ddog_telemetry_builder_with_str_telemetry_root_session_id( + telemetry_builder: &mut TelemetryWorkerBuilder, + param: ffi::CharSlice, + ) -> ffi::MaybeError { + telemetry_builder.telemetry_root_session_id = + match (|s: ffi::CharSlice| -> Result<_, String> { + Ok(Some(s.to_utf8_lossy().into_owned())) + })(param) + { + Ok(o) => o, + Err(e) => { + return ffi::MaybeError::Some(libdd_common_ffi::Error::from( + ({ + let res = std::fmt::format(format_args!("{e:?}")); + res + }), + )); + } + }; + ffi::MaybeError::None + } #[repr(C)] #[allow(dead_code)] pub enum TelemetryWorkerBuilderStrProperty { @@ -259,6 +303,8 @@ mod macros { HostKernelRelease, HostKernelVersion, RuntimeId, + TelemetrySessionId, + TelemetryRootSessionId, } #[no_mangle] /** @@ -288,6 +334,10 @@ mod macros { * runtime_id + * telemetry_session_id + + * telemetry_root_session_id + */ pub unsafe extern "C" fn ddog_telemetry_builder_with_property_str( telemetry_builder: &mut TelemetryWorkerBuilder, @@ -481,6 +531,40 @@ mod macros { } }; } + TelemetrySessionId => { + telemetry_builder.telemetry_session_id = + match (|s: ffi::CharSlice| -> Result<_, String> { + Ok(Some(s.to_utf8_lossy().into_owned())) + })(param) + { + Ok(o) => o, + Err(e) => { + return ffi::MaybeError::Some(libdd_common_ffi::Error::from( + ({ + let res = std::fmt::format(format_args!("{e:?}")); + res + }), + )); + } + }; + } + TelemetryRootSessionId => { + telemetry_builder.telemetry_root_session_id = + match (|s: ffi::CharSlice| -> Result<_, String> { + Ok(Some(s.to_utf8_lossy().into_owned())) + })(param) + { + Ok(o) => o, + Err(e) => { + return ffi::MaybeError::Some(libdd_common_ffi::Error::from( + ({ + let res = std::fmt::format(format_args!("{e:?}")); + res + }), + )); + } + }; + } } ffi::MaybeError::None } @@ -512,6 +596,10 @@ mod macros { * runtime_id + * telemetry_session_id + + * telemetry_root_session_id + */ pub unsafe extern "C" fn ddog_telemetry_builder_with_str_named_property( telemetry_builder: &mut TelemetryWorkerBuilder, @@ -715,6 +803,40 @@ mod macros { } }; } + "telemetry_session_id" => { + telemetry_builder.telemetry_session_id = + match (|s: ffi::CharSlice| -> Result<_, String> { + Ok(Some(s.to_utf8_lossy().into_owned())) + })(param) + { + Ok(o) => o, + Err(e) => { + return ffi::MaybeError::Some(libdd_common_ffi::Error::from( + ({ + let res = std::fmt::format(format_args!("{e:?}")); + res + }), + )); + } + }; + } + "telemetry_root_session_id" => { + telemetry_builder.telemetry_root_session_id = + match (|s: ffi::CharSlice| -> Result<_, String> { + Ok(Some(s.to_utf8_lossy().into_owned())) + })(param) + { + Ok(o) => o, + Err(e) => { + return ffi::MaybeError::Some(libdd_common_ffi::Error::from( + ({ + let res = std::fmt::format(format_args!("{e:?}")); + res + }), + )); + } + }; + } _ => return ffi::MaybeError::None, } ffi::MaybeError::None diff --git a/libdd-telemetry-ffi/src/builder/macros.rs b/libdd-telemetry-ffi/src/builder/macros.rs index 4c9377b44d..dc25a00a4d 100644 --- a/libdd-telemetry-ffi/src/builder/macros.rs +++ b/libdd-telemetry-ffi/src/builder/macros.rs @@ -36,7 +36,9 @@ crate::c_setters!( host.kernel_release, host.kernel_version, - runtime_id + runtime_id, + telemetry_session_id, + telemetry_root_session_id, } ); diff --git a/libdd-telemetry-ffi/src/lib.rs b/libdd-telemetry-ffi/src/lib.rs index fde98bedaf..90971a82f9 100644 --- a/libdd-telemetry-ffi/src/lib.rs +++ b/libdd-telemetry-ffi/src/lib.rs @@ -222,6 +222,43 @@ mod tests { } } + #[test] + #[cfg_attr(miri, ignore)] + fn test_telemetry_session_id_ffi_setters_match_runtime_id_option_string() { + unsafe { + let mut builder: MaybeUninit> = MaybeUninit::uninit(); + assert_eq!( + ddog_telemetry_builder_instantiate( + NonNull::new(&mut builder).unwrap().cast(), + ffi::CharSlice::from("service_name"), + ffi::CharSlice::from("language_name"), + ffi::CharSlice::from("language_version"), + ffi::CharSlice::from("tracer_version"), + ), + MaybeError::None + ); + let mut builder = builder.assume_init(); + + assert_eq!( + ddog_telemetry_builder_with_str_telemetry_session_id( + &mut builder, + ffi::CharSlice::from("sess-1"), + ), + MaybeError::None, + ); + assert_eq!(builder.telemetry_session_id, Some("sess-1".into())); + + assert_eq!( + ddog_telemetry_builder_with_str_telemetry_root_session_id( + &mut builder, + ffi::CharSlice::from("root-9"), + ), + MaybeError::None, + ); + assert_eq!(builder.telemetry_root_session_id, Some("root-9".into())); + } + } + #[test] #[cfg_attr(miri, ignore)] fn test_worker_run() { diff --git a/libdd-telemetry/src/worker/http_client.rs b/libdd-telemetry/src/worker/http_client.rs index 5f00744b12..c459bc48cd 100644 --- a/libdd-telemetry/src/worker/http_client.rs +++ b/libdd-telemetry/src/worker/http_client.rs @@ -12,6 +12,7 @@ use std::{ }; use crate::config::Config; +use http::HeaderValue; use tracing::{debug, error}; pub mod header { @@ -22,8 +23,45 @@ pub mod header { pub const LIBRARY_LANGUAGE: HeaderName = HeaderName::from_static("dd-client-library-language"); pub const LIBRARY_VERSION: HeaderName = HeaderName::from_static("dd-client-library-version"); - /// Header key for whether to enable debug mode of telemetry. pub const DEBUG_ENABLED: HeaderName = HeaderName::from_static("dd-telemetry-debug-enabled"); + + pub const DD_SESSION_ID: HeaderName = HeaderName::from_static("dd-session-id"); + pub const DD_ROOT_SESSION_ID: HeaderName = HeaderName::from_static("dd-root-session-id"); +} + +pub(crate) fn instrumentation_session_headers<'a>( + telemetry_session_id: Option<&'a str>, + runtime_id: &'a str, + telemetry_root_session_id: Option<&'a str>, +) -> (&'a str, Option<&'a str>) { + let session_override = telemetry_session_id.filter(|s| !s.is_empty()); + let session = session_override.unwrap_or(runtime_id); + let root = telemetry_root_session_id + .filter(|s| !s.is_empty()) + .filter(|r| *r != session); + (session, root) +} + +pub(crate) fn add_instrumentation_session_headers( + builder: HttpRequestBuilder, + telemetry_session_id: Option<&str>, + runtime_id: &str, + telemetry_root_session_id: Option<&str>, +) -> anyhow::Result { + let (session, root) = instrumentation_session_headers( + telemetry_session_id, + runtime_id, + telemetry_root_session_id, + ); + let session_hv = + HeaderValue::from_str(session).map_err(|e| anyhow::anyhow!("invalid dd-session-id: {e}"))?; + let mut b = builder.header(header::DD_SESSION_ID, session_hv); + if let Some(root_id) = root { + let root_hv = HeaderValue::from_str(root_id) + .map_err(|e| anyhow::anyhow!("invalid dd-root-session-id: {e}"))?; + b = b.header(header::DD_ROOT_SESSION_ID, root_hv); + } + Ok(b) } pub type ResponseFuture = diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index 3bfa1bcccb..b3263c4ad5 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -136,6 +136,8 @@ pub struct TelemetryWorker { cancellation_token: CancellationToken, seq_id: AtomicU64, runtime_id: String, + telemetry_session_id: Option, + telemetry_root_session_id: Option, client: Box, metrics_flush_interval: Duration, deadlines: scheduler::Scheduler, @@ -150,6 +152,8 @@ impl Debug for TelemetryWorker { .field("cancellation_token", &self.cancellation_token) .field("seq_id", &self.seq_id) .field("runtime_id", &self.runtime_id) + .field("telemetry_session_id", &self.telemetry_session_id) + .field("telemetry_root_session_id", &self.telemetry_root_session_id) .field("metrics_flush_interval", &self.metrics_flush_interval) .field("deadlines", &self.deadlines) .field("data", &self.data) @@ -765,11 +769,25 @@ impl TelemetryWorker { http_client::header::LIBRARY_VERSION, tel.application.tracer_version.clone(), ); + let req = http_client::add_instrumentation_session_headers( + req, + self.telemetry_session_id.as_deref(), + &self.runtime_id, + self.telemetry_root_session_id.as_deref(), + )?; let body = http_common::Body::from(serialize::serialize(&tel)?); Ok(req.body(body)?) } + #[cfg(test)] + pub(crate) fn test_build_request( + &self, + payload: &data::Payload, + ) -> anyhow::Result { + self.build_request(payload) + } + async fn send_request( &self, req: http_common::HttpRequest, @@ -1033,6 +1051,8 @@ pub struct TelemetryWorkerBuilder { pub host: Host, pub application: Application, pub runtime_id: Option, + pub telemetry_session_id: Option, + pub telemetry_root_session_id: Option, pub dependencies: store::Store, pub integrations: store::Store, pub configurations: store::Store, @@ -1084,6 +1104,8 @@ impl TelemetryWorkerBuilder { ..Default::default() }, runtime_id: None, + telemetry_session_id: None, + telemetry_root_session_id: None, dependencies: store::Store::new(MAX_ITEMS), integrations: store::Store::new(MAX_ITEMS), configurations: store::Store::new(MAX_ITEMS), @@ -1134,6 +1156,8 @@ impl TelemetryWorkerBuilder { runtime_id: self .runtime_id .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()), + telemetry_session_id: self.telemetry_session_id, + telemetry_root_session_id: self.telemetry_root_session_id, client, metrics_flush_interval, deadlines: scheduler::Scheduler::new(vec![ @@ -1190,11 +1214,78 @@ impl TelemetryWorkerBuilder { #[cfg(test)] mod tests { - use crate::worker::TelemetryWorkerHandle; + use crate::data::Payload; + use crate::worker::http_client::header::{DD_ROOT_SESSION_ID, DD_SESSION_ID}; + use crate::worker::{TelemetryWorker, TelemetryWorkerBuilder, TelemetryWorkerHandle}; + use libdd_common::Endpoint; + use tokio::runtime::Runtime; fn is_send(_: T) {} fn is_sync(_: T) {} + fn worker_with_sessions( + runtime_id: Option, + telemetry_session_id: Option, + telemetry_root_session_id: Option, + ) -> TelemetryWorker { + let mut b = TelemetryWorkerBuilder::new( + "h".into(), + "svc".into(), + "lang".into(), + "1".into(), + "tv".into(), + ); + b.config + .set_endpoint(Endpoint::from_slice("http://127.0.0.1:1")) + .unwrap(); + b.runtime_id = runtime_id; + b.telemetry_session_id = telemetry_session_id; + b.telemetry_root_session_id = telemetry_root_session_id; + let rt = Runtime::new().unwrap(); + b.build_worker(rt.handle().clone()).1 + } + + #[test] + fn telemetry_http_request_always_includes_dd_session_id_from_runtime_id() { + let worker = worker_with_sessions(Some("run-9abc".into()), None, None); + let req = worker + .test_build_request(&Payload::AppHeartbeat(())) + .unwrap(); + assert_eq!( + req.headers() + .get(DD_SESSION_ID) + .expect("dd-session-id") + .to_str() + .unwrap(), + "run-9abc" + ); + assert!(req.headers().get(DD_ROOT_SESSION_ID).is_none()); + } + + #[test] + fn telemetry_http_request_includes_dd_root_session_id_when_set_and_distinct() { + let worker = worker_with_sessions( + Some("run-9abc".into()), + Some("sess-child".into()), + Some("sess-root".into()), + ); + let req = worker + .test_build_request(&Payload::AppHeartbeat(())) + .unwrap(); + assert_eq!( + req.headers().get(DD_SESSION_ID).unwrap().to_str().unwrap(), + "sess-child" + ); + assert_eq!( + req.headers() + .get(DD_ROOT_SESSION_ID) + .expect("dd-root-session-id") + .to_str() + .unwrap(), + "sess-root" + ); + } + #[test] fn test_handle_sync_send() { #[allow(clippy::redundant_closure)] From 63dcb21910dc8c1027c3e8a463a56670176c1999 Mon Sep 17 00:00:00 2001 From: Munir Date: Fri, 27 Mar 2026 19:22:45 -0400 Subject: [PATCH 2/6] add parent_id support and clean up tests --- libdd-telemetry-ffi/src/builder/expanded.rs | 97 +++++++++++--- libdd-telemetry-ffi/src/builder/macros.rs | 5 +- libdd-telemetry-ffi/src/lib.rs | 19 ++- libdd-telemetry/src/worker/http_client.rs | 51 +++----- libdd-telemetry/src/worker/mod.rs | 132 +++++++++++--------- 5 files changed, 185 insertions(+), 119 deletions(-) diff --git a/libdd-telemetry-ffi/src/builder/expanded.rs b/libdd-telemetry-ffi/src/builder/expanded.rs index ad601239e9..af3d6facd9 100644 --- a/libdd-telemetry-ffi/src/builder/expanded.rs +++ b/libdd-telemetry-ffi/src/builder/expanded.rs @@ -246,11 +246,11 @@ mod macros { ffi::MaybeError::None } #[no_mangle] - pub unsafe extern "C" fn ddog_telemetry_builder_with_str_telemetry_session_id( + pub unsafe extern "C" fn ddog_telemetry_builder_with_str_session_id( telemetry_builder: &mut TelemetryWorkerBuilder, param: ffi::CharSlice, ) -> ffi::MaybeError { - telemetry_builder.telemetry_session_id = + telemetry_builder.session_id = match (|s: ffi::CharSlice| -> Result<_, String> { Ok(Some(s.to_utf8_lossy().into_owned())) })(param) @@ -268,11 +268,33 @@ mod macros { ffi::MaybeError::None } #[no_mangle] - pub unsafe extern "C" fn ddog_telemetry_builder_with_str_telemetry_root_session_id( + pub unsafe extern "C" fn ddog_telemetry_builder_with_str_root_session_id( telemetry_builder: &mut TelemetryWorkerBuilder, param: ffi::CharSlice, ) -> ffi::MaybeError { - telemetry_builder.telemetry_root_session_id = + telemetry_builder.root_session_id = + match (|s: ffi::CharSlice| -> Result<_, String> { + Ok(Some(s.to_utf8_lossy().into_owned())) + })(param) + { + Ok(o) => o, + Err(e) => { + return ffi::MaybeError::Some(libdd_common_ffi::Error::from( + ({ + let res = std::fmt::format(format_args!("{e:?}")); + res + }), + )); + } + }; + ffi::MaybeError::None + } + #[no_mangle] + pub unsafe extern "C" fn ddog_telemetry_builder_with_str_parent_session_id( + telemetry_builder: &mut TelemetryWorkerBuilder, + param: ffi::CharSlice, + ) -> ffi::MaybeError { + telemetry_builder.parent_session_id = match (|s: ffi::CharSlice| -> Result<_, String> { Ok(Some(s.to_utf8_lossy().into_owned())) })(param) @@ -303,8 +325,9 @@ mod macros { HostKernelRelease, HostKernelVersion, RuntimeId, - TelemetrySessionId, - TelemetryRootSessionId, + SessionId, + RootSessionId, + ParentSessionId, } #[no_mangle] /** @@ -334,9 +357,11 @@ mod macros { * runtime_id - * telemetry_session_id + * session_id - * telemetry_root_session_id + * root_session_id + + * parent_session_id */ pub unsafe extern "C" fn ddog_telemetry_builder_with_property_str( @@ -531,8 +556,25 @@ mod macros { } }; } - TelemetrySessionId => { - telemetry_builder.telemetry_session_id = + SessionId => { + telemetry_builder.session_id = + match (|s: ffi::CharSlice| -> Result<_, String> { + Ok(Some(s.to_utf8_lossy().into_owned())) + })(param) + { + Ok(o) => o, + Err(e) => { + return ffi::MaybeError::Some(libdd_common_ffi::Error::from( + ({ + let res = std::fmt::format(format_args!("{e:?}")); + res + }), + )); + } + }; + } + RootSessionId => { + telemetry_builder.root_session_id = match (|s: ffi::CharSlice| -> Result<_, String> { Ok(Some(s.to_utf8_lossy().into_owned())) })(param) @@ -548,8 +590,8 @@ mod macros { } }; } - TelemetryRootSessionId => { - telemetry_builder.telemetry_root_session_id = + ParentSessionId => { + telemetry_builder.parent_session_id = match (|s: ffi::CharSlice| -> Result<_, String> { Ok(Some(s.to_utf8_lossy().into_owned())) })(param) @@ -596,9 +638,11 @@ mod macros { * runtime_id - * telemetry_session_id + * session_id - * telemetry_root_session_id + * root_session_id + + * parent_session_id */ pub unsafe extern "C" fn ddog_telemetry_builder_with_str_named_property( @@ -803,8 +847,25 @@ mod macros { } }; } - "telemetry_session_id" => { - telemetry_builder.telemetry_session_id = + "session_id" => { + telemetry_builder.session_id = + match (|s: ffi::CharSlice| -> Result<_, String> { + Ok(Some(s.to_utf8_lossy().into_owned())) + })(param) + { + Ok(o) => o, + Err(e) => { + return ffi::MaybeError::Some(libdd_common_ffi::Error::from( + ({ + let res = std::fmt::format(format_args!("{e:?}")); + res + }), + )); + } + }; + } + "root_session_id" => { + telemetry_builder.root_session_id = match (|s: ffi::CharSlice| -> Result<_, String> { Ok(Some(s.to_utf8_lossy().into_owned())) })(param) @@ -820,8 +881,8 @@ mod macros { } }; } - "telemetry_root_session_id" => { - telemetry_builder.telemetry_root_session_id = + "parent_session_id" => { + telemetry_builder.parent_session_id = match (|s: ffi::CharSlice| -> Result<_, String> { Ok(Some(s.to_utf8_lossy().into_owned())) })(param) diff --git a/libdd-telemetry-ffi/src/builder/macros.rs b/libdd-telemetry-ffi/src/builder/macros.rs index dc25a00a4d..150b23fcb8 100644 --- a/libdd-telemetry-ffi/src/builder/macros.rs +++ b/libdd-telemetry-ffi/src/builder/macros.rs @@ -37,8 +37,9 @@ crate::c_setters!( host.kernel_version, runtime_id, - telemetry_session_id, - telemetry_root_session_id, + session_id, + root_session_id, + parent_session_id, } ); diff --git a/libdd-telemetry-ffi/src/lib.rs b/libdd-telemetry-ffi/src/lib.rs index 90971a82f9..f7df611b83 100644 --- a/libdd-telemetry-ffi/src/lib.rs +++ b/libdd-telemetry-ffi/src/lib.rs @@ -224,7 +224,7 @@ mod tests { #[test] #[cfg_attr(miri, ignore)] - fn test_telemetry_session_id_ffi_setters_match_runtime_id_option_string() { + fn test_session_id_and_root_session_id_ffi_setters_match_runtime_id_option_string() { unsafe { let mut builder: MaybeUninit> = MaybeUninit::uninit(); assert_eq!( @@ -240,22 +240,31 @@ mod tests { let mut builder = builder.assume_init(); assert_eq!( - ddog_telemetry_builder_with_str_telemetry_session_id( + ddog_telemetry_builder_with_str_session_id( &mut builder, ffi::CharSlice::from("sess-1"), ), MaybeError::None, ); - assert_eq!(builder.telemetry_session_id, Some("sess-1".into())); + assert_eq!(builder.session_id, Some("sess-1".into())); assert_eq!( - ddog_telemetry_builder_with_str_telemetry_root_session_id( + ddog_telemetry_builder_with_str_root_session_id( &mut builder, ffi::CharSlice::from("root-9"), ), MaybeError::None, ); - assert_eq!(builder.telemetry_root_session_id, Some("root-9".into())); + assert_eq!(builder.root_session_id, Some("root-9".into())); + + assert_eq!( + ddog_telemetry_builder_with_str_parent_session_id( + &mut builder, + ffi::CharSlice::from("parent-2"), + ), + MaybeError::None, + ); + assert_eq!(builder.parent_session_id, Some("parent-2".into())); } } diff --git a/libdd-telemetry/src/worker/http_client.rs b/libdd-telemetry/src/worker/http_client.rs index c459bc48cd..1c60b19d9c 100644 --- a/libdd-telemetry/src/worker/http_client.rs +++ b/libdd-telemetry/src/worker/http_client.rs @@ -12,7 +12,6 @@ use std::{ }; use crate::config::Config; -use http::HeaderValue; use tracing::{debug, error}; pub mod header { @@ -27,41 +26,29 @@ pub mod header { pub const DD_SESSION_ID: HeaderName = HeaderName::from_static("dd-session-id"); pub const DD_ROOT_SESSION_ID: HeaderName = HeaderName::from_static("dd-root-session-id"); -} - -pub(crate) fn instrumentation_session_headers<'a>( - telemetry_session_id: Option<&'a str>, - runtime_id: &'a str, - telemetry_root_session_id: Option<&'a str>, -) -> (&'a str, Option<&'a str>) { - let session_override = telemetry_session_id.filter(|s| !s.is_empty()); - let session = session_override.unwrap_or(runtime_id); - let root = telemetry_root_session_id - .filter(|s| !s.is_empty()) - .filter(|r| *r != session); - (session, root) + pub const DD_PARENT_SESSION_ID: HeaderName = HeaderName::from_static("dd-parent-session-id"); } pub(crate) fn add_instrumentation_session_headers( - builder: HttpRequestBuilder, - telemetry_session_id: Option<&str>, - runtime_id: &str, - telemetry_root_session_id: Option<&str>, -) -> anyhow::Result { - let (session, root) = instrumentation_session_headers( - telemetry_session_id, - runtime_id, - telemetry_root_session_id, - ); - let session_hv = - HeaderValue::from_str(session).map_err(|e| anyhow::anyhow!("invalid dd-session-id: {e}"))?; - let mut b = builder.header(header::DD_SESSION_ID, session_hv); - if let Some(root_id) = root { - let root_hv = HeaderValue::from_str(root_id) - .map_err(|e| anyhow::anyhow!("invalid dd-root-session-id: {e}"))?; - b = b.header(header::DD_ROOT_SESSION_ID, root_hv); + mut builder: HttpRequestBuilder, + session_id: Option<&str>, + root_session_id: Option<&str>, + parent_session_id: Option<&str>, +) -> HttpRequestBuilder { + let Some(s) = session_id.filter(|id| !id.is_empty()) else { + return builder; + }; + builder = builder.header(header::DD_SESSION_ID, s); + if let Some(r) = root_session_id + .filter(|r| !r.is_empty()) + .filter(|r| *r != s) + { + builder = builder.header(header::DD_ROOT_SESSION_ID, r); + } + if let Some(p) = parent_session_id.filter(|p| !p.is_empty()) { + builder = builder.header(header::DD_PARENT_SESSION_ID, p); } - Ok(b) + builder } pub type ResponseFuture = diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index b3263c4ad5..5e5263389d 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -136,8 +136,9 @@ pub struct TelemetryWorker { cancellation_token: CancellationToken, seq_id: AtomicU64, runtime_id: String, - telemetry_session_id: Option, - telemetry_root_session_id: Option, + session_id: Option, + root_session_id: Option, + parent_session_id: Option, client: Box, metrics_flush_interval: Duration, deadlines: scheduler::Scheduler, @@ -152,8 +153,9 @@ impl Debug for TelemetryWorker { .field("cancellation_token", &self.cancellation_token) .field("seq_id", &self.seq_id) .field("runtime_id", &self.runtime_id) - .field("telemetry_session_id", &self.telemetry_session_id) - .field("telemetry_root_session_id", &self.telemetry_root_session_id) + .field("session_id", &self.session_id) + .field("root_session_id", &self.root_session_id) + .field("parent_session_id", &self.parent_session_id) .field("metrics_flush_interval", &self.metrics_flush_interval) .field("deadlines", &self.deadlines) .field("data", &self.data) @@ -762,7 +764,6 @@ impl TelemetryWorker { ) .header( http_client::header::LIBRARY_LANGUAGE, - // Note: passing by ref here just causes the clone to happen underneath tel.application.language_name.clone(), ) .header( @@ -771,23 +772,15 @@ impl TelemetryWorker { ); let req = http_client::add_instrumentation_session_headers( req, - self.telemetry_session_id.as_deref(), - &self.runtime_id, - self.telemetry_root_session_id.as_deref(), - )?; + self.session_id.as_deref(), + self.root_session_id.as_deref(), + self.parent_session_id.as_deref(), + ); let body = http_common::Body::from(serialize::serialize(&tel)?); Ok(req.body(body)?) } - #[cfg(test)] - pub(crate) fn test_build_request( - &self, - payload: &data::Payload, - ) -> anyhow::Result { - self.build_request(payload) - } - async fn send_request( &self, req: http_common::HttpRequest, @@ -1051,8 +1044,9 @@ pub struct TelemetryWorkerBuilder { pub host: Host, pub application: Application, pub runtime_id: Option, - pub telemetry_session_id: Option, - pub telemetry_root_session_id: Option, + pub session_id: Option, + pub root_session_id: Option, + pub parent_session_id: Option, pub dependencies: store::Store, pub integrations: store::Store, pub configurations: store::Store, @@ -1104,8 +1098,9 @@ impl TelemetryWorkerBuilder { ..Default::default() }, runtime_id: None, - telemetry_session_id: None, - telemetry_root_session_id: None, + session_id: None, + root_session_id: None, + parent_session_id: None, dependencies: store::Store::new(MAX_ITEMS), integrations: store::Store::new(MAX_ITEMS), configurations: store::Store::new(MAX_ITEMS), @@ -1156,8 +1151,9 @@ impl TelemetryWorkerBuilder { runtime_id: self .runtime_id .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()), - telemetry_session_id: self.telemetry_session_id, - telemetry_root_session_id: self.telemetry_root_session_id, + session_id: self.session_id, + root_session_id: self.root_session_id, + parent_session_id: self.parent_session_id, client, metrics_flush_interval, deadlines: scheduler::Scheduler::new(vec![ @@ -1215,18 +1211,17 @@ impl TelemetryWorkerBuilder { #[cfg(test)] mod tests { use crate::data::Payload; - use crate::worker::http_client::header::{DD_ROOT_SESSION_ID, DD_SESSION_ID}; - use crate::worker::{TelemetryWorker, TelemetryWorkerBuilder, TelemetryWorkerHandle}; + use crate::worker::http_client::header::{ + DD_PARENT_SESSION_ID, DD_ROOT_SESSION_ID, DD_SESSION_ID, + }; + use crate::worker::{TelemetryWorker, TelemetryWorkerBuilder}; use libdd_common::Endpoint; use tokio::runtime::Runtime; - fn is_send(_: T) {} - fn is_sync(_: T) {} - - fn worker_with_sessions( - runtime_id: Option, - telemetry_session_id: Option, - telemetry_root_session_id: Option, + fn test_worker( + session_id: Option, + root_session_id: Option, + parent_session_id: Option, ) -> TelemetryWorker { let mut b = TelemetryWorkerBuilder::new( "h".into(), @@ -1238,59 +1233,72 @@ mod tests { b.config .set_endpoint(Endpoint::from_slice("http://127.0.0.1:1")) .unwrap(); - b.runtime_id = runtime_id; - b.telemetry_session_id = telemetry_session_id; - b.telemetry_root_session_id = telemetry_root_session_id; + b.runtime_id = Some("rid".into()); + b.session_id = session_id; + b.root_session_id = root_session_id; + b.parent_session_id = parent_session_id; let rt = Runtime::new().unwrap(); b.build_worker(rt.handle().clone()).1 } #[test] - fn telemetry_http_request_always_includes_dd_session_id_from_runtime_id() { - let worker = worker_with_sessions(Some("run-9abc".into()), None, None); - let req = worker - .test_build_request(&Payload::AppHeartbeat(())) + fn telemetry_http_includes_dd_session_id() { + let req = test_worker(Some("sess".into()), None, None) + .build_request(&Payload::AppHeartbeat(())) .unwrap(); assert_eq!( - req.headers() - .get(DD_SESSION_ID) - .expect("dd-session-id") - .to_str() - .unwrap(), - "run-9abc" + req.headers().get(DD_SESSION_ID).unwrap().to_str().unwrap(), + "sess" ); assert!(req.headers().get(DD_ROOT_SESSION_ID).is_none()); + assert!(req.headers().get(DD_PARENT_SESSION_ID).is_none()); } #[test] - fn telemetry_http_request_includes_dd_root_session_id_when_set_and_distinct() { - let worker = worker_with_sessions( - Some("run-9abc".into()), - Some("sess-child".into()), - Some("sess-root".into()), - ); - let req = worker - .test_build_request(&Payload::AppHeartbeat(())) - .unwrap(); + fn telemetry_http_includes_dd_session_id_and_root_session_id() { + let req = test_worker( + Some("sess".into()), + Some("root".into()), + None, + ) + .build_request(&Payload::AppHeartbeat(())) + .unwrap(); assert_eq!( req.headers().get(DD_SESSION_ID).unwrap().to_str().unwrap(), - "sess-child" + "sess" ); assert_eq!( req.headers() .get(DD_ROOT_SESSION_ID) - .expect("dd-root-session-id") + .unwrap() .to_str() .unwrap(), - "sess-root" + "root" ); + assert!(req.headers().get(DD_PARENT_SESSION_ID).is_none()); } #[test] - fn test_handle_sync_send() { - #[allow(clippy::redundant_closure)] - let _ = |h: TelemetryWorkerHandle| is_send(h); - #[allow(clippy::redundant_closure)] - let _ = |h: TelemetryWorkerHandle| is_sync(h); + fn telemetry_http_includes_dd_session_id_and_parent_session_id() { + let req = test_worker( + Some("sess".into()), + None, + Some("parent".into()), + ) + .build_request(&Payload::AppHeartbeat(())) + .unwrap(); + assert_eq!( + req.headers().get(DD_SESSION_ID).unwrap().to_str().unwrap(), + "sess" + ); + assert!(req.headers().get(DD_ROOT_SESSION_ID).is_none()); + assert_eq!( + req.headers() + .get(DD_PARENT_SESSION_ID) + .unwrap() + .to_str() + .unwrap(), + "parent" + ); } } From 85d5a6936eaa9bd5731806db62c40eba6212ef17 Mon Sep 17 00:00:00 2001 From: Munir Date: Fri, 27 Mar 2026 19:40:32 -0400 Subject: [PATCH 3/6] update datapipeline --- examples/ffi/trace_exporter.c | 5 +- libdd-data-pipeline-ffi/src/trace_exporter.rs | 58 +++++++++++++++++++ libdd-data-pipeline/src/telemetry/mod.rs | 32 +++++++++- .../src/trace_exporter/builder.rs | 10 ++++ libdd-data-pipeline/src/trace_exporter/mod.rs | 6 ++ libdd-telemetry/src/worker/mod.rs | 42 +++++++------- 6 files changed, 131 insertions(+), 22 deletions(-) diff --git a/examples/ffi/trace_exporter.c b/examples/ffi/trace_exporter.c index 3d646dbc6d..035ab94941 100644 --- a/examples/ffi/trace_exporter.c +++ b/examples/ffi/trace_exporter.c @@ -99,7 +99,10 @@ int main(int argc, char** argv) ddog_TelemetryClientConfig telemetry_config = { .interval = 60000, .runtime_id = DDOG_CHARSLICE_C("12345678-1234-1234-1234-123456789abc"), - .debug_enabled = true + .debug_enabled = true, + .session_id = DDOG_CHARSLICE_C(""), + .root_session_id = DDOG_CHARSLICE_C(""), + .parent_session_id = DDOG_CHARSLICE_C(""), }; ret = ddog_trace_exporter_config_enable_telemetry(config, &telemetry_config); diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index b8d23b01c6..e470acd034 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -43,6 +43,13 @@ pub struct TelemetryClientConfig<'a> { /// When enabled, sets the DD-Telemetry-Debug-Enabled header to true. /// Defaults to false. pub debug_enabled: bool, + + /// Instrumentation session id (`dd-session-id`), same encoding rules as [`Self::runtime_id`]. + pub session_id: CharSlice<'a>, + /// Root session id (`dd-root-session-id`). + pub root_session_id: CharSlice<'a>, + /// Parent session id (`dd-parent-session-id`). + pub parent_session_id: CharSlice<'a>, } /// The TraceExporterConfig object will hold the configuration properties for the TraceExporter. @@ -301,6 +308,18 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_enable_telemetry( Err(e) => return Some(e), }, debug_enabled: telemetry_cfg.debug_enabled, + session_id: match sanitize_string(telemetry_cfg.session_id) { + Ok(s) => Some(s), + Err(e) => return Some(e), + }, + root_session_id: match sanitize_string(telemetry_cfg.root_session_id) { + Ok(s) => Some(s), + Err(e) => return Some(e), + }, + parent_session_id: match sanitize_string(telemetry_cfg.parent_session_id) { + Ok(s) => Some(s), + Err(e) => return Some(e), + }, }; debug!(telemetry_cfg = ?cfg, "Configuring telemetry"); config.telemetry_cfg = Some(cfg); @@ -831,6 +850,9 @@ mod tests { interval: 1000, runtime_id: CharSlice::from("id"), debug_enabled: false, + session_id: CharSlice::empty(), + root_session_id: CharSlice::empty(), + parent_session_id: CharSlice::empty(), }), ); assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument); @@ -848,6 +870,9 @@ mod tests { interval: 1000, runtime_id: CharSlice::from("foo"), debug_enabled: true, + session_id: CharSlice::empty(), + root_session_id: CharSlice::empty(), + parent_session_id: CharSlice::empty(), }), ); assert!(error.is_none()); @@ -863,6 +888,36 @@ mod tests { "foo" ); assert!(cfg.telemetry_cfg.as_ref().unwrap().debug_enabled); + assert_eq!( + cfg.telemetry_cfg.as_ref().unwrap().session_id.as_deref(), + Some("") + ); + assert_eq!( + cfg.telemetry_cfg.as_ref().unwrap().root_session_id.as_deref(), + Some("") + ); + assert_eq!( + cfg.telemetry_cfg.as_ref().unwrap().parent_session_id.as_deref(), + Some("") + ); + + let mut cfg = TraceExporterConfig::default(); + let error = ddog_trace_exporter_config_enable_telemetry( + Some(&mut cfg), + Some(&TelemetryClientConfig { + interval: 500, + runtime_id: CharSlice::from("rid"), + debug_enabled: false, + session_id: CharSlice::from("sess-z"), + root_session_id: CharSlice::from("root-z"), + parent_session_id: CharSlice::from("par-z"), + }), + ); + assert!(error.is_none()); + let t = cfg.telemetry_cfg.as_ref().unwrap(); + assert_eq!(t.session_id.as_deref(), Some("sess-z")); + assert_eq!(t.root_session_id.as_deref(), Some("root-z")); + assert_eq!(t.parent_session_id.as_deref(), Some("par-z")); } } @@ -1143,6 +1198,9 @@ mod tests { heartbeat: 10000, runtime_id: Some("foo".to_string()), debug_enabled: true, + session_id: None, + root_session_id: None, + parent_session_id: None, }), ..Default::default() }; diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index 9715aa50ae..4b8995f738 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -32,6 +32,9 @@ pub struct TelemetryClientBuilder { tracer_version: Option, config: libdd_telemetry::config::Config, runtime_id: Option, + session_id: Option, + root_session_id: Option, + parent_session_id: Option, } impl TelemetryClientBuilder { @@ -93,6 +96,24 @@ impl TelemetryClientBuilder { self } + /// Sets the instrumentation session id sent as the `dd-session-id` header on telemetry requests. + pub fn set_session_id(mut self, id: &str) -> Self { + self.session_id = Some(id.to_string()); + self + } + + /// Sets the root session id sent as the `dd-root-session-id` header (only with a valid session id). + pub fn set_root_session_id(mut self, id: &str) -> Self { + self.root_session_id = Some(id.to_string()); + self + } + + /// Sets the parent session id sent as the `dd-parent-session-id` header (only with a valid session id). + pub fn set_parent_session_id(mut self, id: &str) -> Self { + self.parent_session_id = Some(id.to_string()); + self + } + /// Sets the debug enabled flag for the telemetry client. pub fn set_debug_enabled(mut self, debug: bool) -> Self { self.config.debug_enabled = debug; @@ -117,6 +138,9 @@ impl TelemetryClientBuilder { if let Some(id) = self.runtime_id { builder.runtime_id = Some(id); } + builder.session_id = self.session_id; + builder.root_session_id = self.root_session_id; + builder.parent_session_id = self.parent_session_id; let (worker_handle, worker) = builder.build_worker(runtime); @@ -854,7 +878,10 @@ mod tests { let telemetry_srv = server .mock_async(|when, then| { when.method(POST) - .body_includes(r#""application":{"service_name":"test_service","service_version":"test_version","env":"test_env","language_name":"test_language","language_version":"test_language_version","tracer_version":"test_tracer_version"}"#); + .body_includes(r#""application":{"service_name":"test_service","service_version":"test_version","env":"test_env","language_name":"test_language","language_version":"test_language_version","tracer_version":"test_tracer_version"}"#) + .header("dd-session-id", "sess-e2e") + .header("dd-root-session-id", "root-e2e") + .header("dd-parent-session-id", "parent-e2e"); then.status(200).body(""); }) .await; @@ -869,6 +896,9 @@ mod tests { .set_url(&server.url("/")) .set_heartbeat(100) .set_runtime_id("foo") + .set_session_id("sess-e2e") + .set_root_session_id("root-e2e") + .set_parent_session_id("parent-e2e") .build(Handle::current()); tokio::spawn(async move { worker.run().await }); diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index b19e8b06af..a22f32133e 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -308,6 +308,15 @@ impl TraceExporterBuilder { if let Some(id) = telemetry_config.runtime_id { builder = builder.set_runtime_id(&id); } + if let Some(ref id) = telemetry_config.session_id { + builder = builder.set_session_id(id); + } + if let Some(ref id) = telemetry_config.root_session_id { + builder = builder.set_root_session_id(id); + } + if let Some(ref id) = telemetry_config.parent_session_id { + builder = builder.set_parent_session_id(id); + } builder.build(runtime.handle().clone()) }); @@ -437,6 +446,7 @@ mod tests { heartbeat: 1000, runtime_id: None, debug_enabled: false, + ..Default::default() }); let exporter = builder.build().unwrap(); diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 06a838ed6c..2859a1e823 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -904,6 +904,12 @@ pub struct TelemetryConfig { pub heartbeat: u64, pub runtime_id: Option, pub debug_enabled: bool, + /// Sent as the `dd-session-id` telemetry header when set. + pub session_id: Option, + /// Sent as `dd-root-session-id` when set (only with a valid session id). + pub root_session_id: Option, + /// Sent as `dd-parent-session-id` when set (only with a valid session id). + pub parent_session_id: Option, } #[allow(missing_docs)] diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index 5e5263389d..72faedab13 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -1215,7 +1215,7 @@ mod tests { DD_PARENT_SESSION_ID, DD_ROOT_SESSION_ID, DD_SESSION_ID, }; use crate::worker::{TelemetryWorker, TelemetryWorkerBuilder}; - use libdd_common::Endpoint; + use libdd_common::{http_common, Endpoint}; use tokio::runtime::Runtime; fn test_worker( @@ -1255,11 +1255,30 @@ mod tests { } #[test] - fn telemetry_http_includes_dd_session_id_and_root_session_id() { + fn telemetry_http_omits_session_family_without_valid_session_id() { + let assert_no_session_headers = |req: &http_common::HttpRequest| { + assert!(req.headers().get(DD_SESSION_ID).is_none()); + assert!(req.headers().get(DD_ROOT_SESSION_ID).is_none()); + assert!(req.headers().get(DD_PARENT_SESSION_ID).is_none()); + }; + + let req = test_worker(None, Some("root".into()), Some("parent".into())) + .build_request(&Payload::AppHeartbeat(())) + .unwrap(); + assert_no_session_headers(&req); + + let req = test_worker(Some(String::new()), Some("root".into()), Some("parent".into())) + .build_request(&Payload::AppHeartbeat(())) + .unwrap(); + assert_no_session_headers(&req); + } + + #[test] + fn telemetry_http_includes_dd_session_root_and_parent_session_ids() { let req = test_worker( Some("sess".into()), Some("root".into()), - None, + Some("parent".into()), ) .build_request(&Payload::AppHeartbeat(())) .unwrap(); @@ -1275,23 +1294,6 @@ mod tests { .unwrap(), "root" ); - assert!(req.headers().get(DD_PARENT_SESSION_ID).is_none()); - } - - #[test] - fn telemetry_http_includes_dd_session_id_and_parent_session_id() { - let req = test_worker( - Some("sess".into()), - None, - Some("parent".into()), - ) - .build_request(&Payload::AppHeartbeat(())) - .unwrap(); - assert_eq!( - req.headers().get(DD_SESSION_ID).unwrap().to_str().unwrap(), - "sess" - ); - assert!(req.headers().get(DD_ROOT_SESSION_ID).is_none()); assert_eq!( req.headers() .get(DD_PARENT_SESSION_ID) From d9aa6eab3cd2cbf6def671be6392fc58189d0eae Mon Sep 17 00:00:00 2001 From: Munir Date: Fri, 27 Mar 2026 20:00:39 -0400 Subject: [PATCH 4/6] fmt --- libdd-data-pipeline-ffi/src/trace_exporter.rs | 12 +- libdd-telemetry-ffi/src/builder/expanded.rs | 145 +++++++++--------- libdd-telemetry/src/worker/mod.rs | 10 +- 3 files changed, 87 insertions(+), 80 deletions(-) diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index e470acd034..d5fd0c43d3 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -893,11 +893,19 @@ mod tests { Some("") ); assert_eq!( - cfg.telemetry_cfg.as_ref().unwrap().root_session_id.as_deref(), + cfg.telemetry_cfg + .as_ref() + .unwrap() + .root_session_id + .as_deref(), Some("") ); assert_eq!( - cfg.telemetry_cfg.as_ref().unwrap().parent_session_id.as_deref(), + cfg.telemetry_cfg + .as_ref() + .unwrap() + .parent_session_id + .as_deref(), Some("") ); diff --git a/libdd-telemetry-ffi/src/builder/expanded.rs b/libdd-telemetry-ffi/src/builder/expanded.rs index af3d6facd9..840eae1c6d 100644 --- a/libdd-telemetry-ffi/src/builder/expanded.rs +++ b/libdd-telemetry-ffi/src/builder/expanded.rs @@ -250,21 +250,20 @@ mod macros { telemetry_builder: &mut TelemetryWorkerBuilder, param: ffi::CharSlice, ) -> ffi::MaybeError { - telemetry_builder.session_id = - match (|s: ffi::CharSlice| -> Result<_, String> { - Ok(Some(s.to_utf8_lossy().into_owned())) - })(param) - { - Ok(o) => o, - Err(e) => { - return ffi::MaybeError::Some(libdd_common_ffi::Error::from( - ({ - let res = std::fmt::format(format_args!("{e:?}")); - res - }), - )); - } - }; + telemetry_builder.session_id = match (|s: ffi::CharSlice| -> Result<_, String> { + Ok(Some(s.to_utf8_lossy().into_owned())) + })(param) + { + Ok(o) => o, + Err(e) => { + return ffi::MaybeError::Some(libdd_common_ffi::Error::from( + ({ + let res = std::fmt::format(format_args!("{e:?}")); + res + }), + )); + } + }; ffi::MaybeError::None } #[no_mangle] @@ -272,21 +271,20 @@ mod macros { telemetry_builder: &mut TelemetryWorkerBuilder, param: ffi::CharSlice, ) -> ffi::MaybeError { - telemetry_builder.root_session_id = - match (|s: ffi::CharSlice| -> Result<_, String> { - Ok(Some(s.to_utf8_lossy().into_owned())) - })(param) - { - Ok(o) => o, - Err(e) => { - return ffi::MaybeError::Some(libdd_common_ffi::Error::from( - ({ - let res = std::fmt::format(format_args!("{e:?}")); - res - }), - )); - } - }; + telemetry_builder.root_session_id = match (|s: ffi::CharSlice| -> Result<_, String> { + Ok(Some(s.to_utf8_lossy().into_owned())) + })(param) + { + Ok(o) => o, + Err(e) => { + return ffi::MaybeError::Some(libdd_common_ffi::Error::from( + ({ + let res = std::fmt::format(format_args!("{e:?}")); + res + }), + )); + } + }; ffi::MaybeError::None } #[no_mangle] @@ -294,21 +292,20 @@ mod macros { telemetry_builder: &mut TelemetryWorkerBuilder, param: ffi::CharSlice, ) -> ffi::MaybeError { - telemetry_builder.parent_session_id = - match (|s: ffi::CharSlice| -> Result<_, String> { - Ok(Some(s.to_utf8_lossy().into_owned())) - })(param) - { - Ok(o) => o, - Err(e) => { - return ffi::MaybeError::Some(libdd_common_ffi::Error::from( - ({ - let res = std::fmt::format(format_args!("{e:?}")); - res - }), - )); - } - }; + telemetry_builder.parent_session_id = match (|s: ffi::CharSlice| -> Result<_, String> { + Ok(Some(s.to_utf8_lossy().into_owned())) + })(param) + { + Ok(o) => o, + Err(e) => { + return ffi::MaybeError::Some(libdd_common_ffi::Error::from( + ({ + let res = std::fmt::format(format_args!("{e:?}")); + res + }), + )); + } + }; ffi::MaybeError::None } #[repr(C)] @@ -557,21 +554,20 @@ mod macros { }; } SessionId => { - telemetry_builder.session_id = - match (|s: ffi::CharSlice| -> Result<_, String> { - Ok(Some(s.to_utf8_lossy().into_owned())) - })(param) - { - Ok(o) => o, - Err(e) => { - return ffi::MaybeError::Some(libdd_common_ffi::Error::from( - ({ - let res = std::fmt::format(format_args!("{e:?}")); - res - }), - )); - } - }; + telemetry_builder.session_id = match (|s: ffi::CharSlice| -> Result<_, String> { + Ok(Some(s.to_utf8_lossy().into_owned())) + })(param) + { + Ok(o) => o, + Err(e) => { + return ffi::MaybeError::Some(libdd_common_ffi::Error::from( + ({ + let res = std::fmt::format(format_args!("{e:?}")); + res + }), + )); + } + }; } RootSessionId => { telemetry_builder.root_session_id = @@ -848,21 +844,20 @@ mod macros { }; } "session_id" => { - telemetry_builder.session_id = - match (|s: ffi::CharSlice| -> Result<_, String> { - Ok(Some(s.to_utf8_lossy().into_owned())) - })(param) - { - Ok(o) => o, - Err(e) => { - return ffi::MaybeError::Some(libdd_common_ffi::Error::from( - ({ - let res = std::fmt::format(format_args!("{e:?}")); - res - }), - )); - } - }; + telemetry_builder.session_id = match (|s: ffi::CharSlice| -> Result<_, String> { + Ok(Some(s.to_utf8_lossy().into_owned())) + })(param) + { + Ok(o) => o, + Err(e) => { + return ffi::MaybeError::Some(libdd_common_ffi::Error::from( + ({ + let res = std::fmt::format(format_args!("{e:?}")); + res + }), + )); + } + }; } "root_session_id" => { telemetry_builder.root_session_id = diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index 72faedab13..dda1ef6685 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -1267,9 +1267,13 @@ mod tests { .unwrap(); assert_no_session_headers(&req); - let req = test_worker(Some(String::new()), Some("root".into()), Some("parent".into())) - .build_request(&Payload::AppHeartbeat(())) - .unwrap(); + let req = test_worker( + Some(String::new()), + Some("root".into()), + Some("parent".into()), + ) + .build_request(&Payload::AppHeartbeat(())) + .unwrap(); assert_no_session_headers(&req); } From 9e85e48c3ae317e617800b206be6a9e8c9c6f010 Mon Sep 17 00:00:00 2001 From: Munir Date: Fri, 27 Mar 2026 20:02:03 -0400 Subject: [PATCH 5/6] fix example --- examples/ffi/trace_exporter.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/ffi/trace_exporter.c b/examples/ffi/trace_exporter.c index 035ab94941..907849d358 100644 --- a/examples/ffi/trace_exporter.c +++ b/examples/ffi/trace_exporter.c @@ -100,8 +100,8 @@ int main(int argc, char** argv) .interval = 60000, .runtime_id = DDOG_CHARSLICE_C("12345678-1234-1234-1234-123456789abc"), .debug_enabled = true, - .session_id = DDOG_CHARSLICE_C(""), - .root_session_id = DDOG_CHARSLICE_C(""), + .session_id = DDOG_CHARSLICE_C("12345678-1234-1234-1234-123456789abc"), + .root_session_id =DDOG_CHARSLICE_C("87654321-1234-1234-1234-123456789abc"), .parent_session_id = DDOG_CHARSLICE_C(""), }; From 9ac72b7ce39860ebb7202a990830683514a1ff95 Mon Sep 17 00:00:00 2001 From: Munir Date: Fri, 27 Mar 2026 20:07:06 -0400 Subject: [PATCH 6/6] fmt 2 --- libdd-data-pipeline/src/telemetry/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/libdd-data-pipeline/src/telemetry/mod.rs b/libdd-data-pipeline/src/telemetry/mod.rs index 4b8995f738..45e8eaa0af 100644 --- a/libdd-data-pipeline/src/telemetry/mod.rs +++ b/libdd-data-pipeline/src/telemetry/mod.rs @@ -96,19 +96,22 @@ impl TelemetryClientBuilder { self } - /// Sets the instrumentation session id sent as the `dd-session-id` header on telemetry requests. + /// Sets the instrumentation session id sent as the `dd-session-id` header on telemetry + /// requests. pub fn set_session_id(mut self, id: &str) -> Self { self.session_id = Some(id.to_string()); self } - /// Sets the root session id sent as the `dd-root-session-id` header (only with a valid session id). + /// Sets the root session id sent as the `dd-root-session-id` header (only with a valid session + /// id). pub fn set_root_session_id(mut self, id: &str) -> Self { self.root_session_id = Some(id.to_string()); self } - /// Sets the parent session id sent as the `dd-parent-session-id` header (only with a valid session id). + /// Sets the parent session id sent as the `dd-parent-session-id` header (only with a valid + /// session id). pub fn set_parent_session_id(mut self, id: &str) -> Self { self.parent_session_id = Some(id.to_string()); self