Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions libdd-telemetry/src/worker/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ pub mod header {

/// Header key for whether to enable debug mode of telemetry.
pub const DEBUG_ENABLED: HeaderName = HeaderName::from_static("dd-telemetry-debug-enabled");

/// Header identifying the current runtime session (equals runtime_id).
pub const SESSION_ID: HeaderName = HeaderName::from_static("dd-session-id");

/// Header identifying the root session (first runtime_id in the process tree).
/// Only sent when the current session is not the root (i.e. in child processes).
pub const ROOT_SESSION_ID: HeaderName = HeaderName::from_static("dd-root-session-id");
}

pub type ResponseFuture =
Expand Down
23 changes: 19 additions & 4 deletions libdd-telemetry/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ pub struct TelemetryWorker {
cancellation_token: CancellationToken,
seq_id: AtomicU64,
runtime_id: String,
root_session_id: String,
client: Box<dyn http_client::HttpClient + Sync + Send>,
metrics_flush_interval: Duration,
deadlines: scheduler::Scheduler<LifecycleAction>,
Expand Down Expand Up @@ -745,7 +746,7 @@ impl TelemetryWorker {

telemetry_worker_log!(self, DEBUG, "Prepared payload: {:?}", tel);

let req = http_client::request_builder(&self.config)?
let mut req = http_client::request_builder(&self.config)?
.method(http::Method::POST)
.header(header::CONTENT_TYPE, serialize::CONTENT_TYPE_VALUE)
.header(
Expand All @@ -764,7 +765,15 @@ impl TelemetryWorker {
.header(
http_client::header::LIBRARY_VERSION,
tel.application.tracer_version.clone(),
)
.header(http_client::header::SESSION_ID, self.runtime_id.clone());

if self.runtime_id != self.root_session_id {
req = req.header(
http_client::header::ROOT_SESSION_ID,
self.root_session_id.clone(),
);
}

let body = http_common::Body::from(serialize::serialize(&tel)?);
Ok(req.body(body)?)
Expand Down Expand Up @@ -1033,6 +1042,7 @@ pub struct TelemetryWorkerBuilder {
pub host: Host,
pub application: Application,
pub runtime_id: Option<String>,
pub root_session_id: Option<String>,
pub dependencies: store::Store<data::Dependency>,
pub integrations: store::Store<data::Integration>,
pub configurations: store::Store<data::Configuration>,
Expand Down Expand Up @@ -1084,6 +1094,7 @@ impl TelemetryWorkerBuilder {
..Default::default()
},
runtime_id: None,
root_session_id: None,
dependencies: store::Store::new(MAX_ITEMS),
integrations: store::Store::new(MAX_ITEMS),
configurations: store::Store::new(MAX_ITEMS),
Expand Down Expand Up @@ -1113,6 +1124,11 @@ impl TelemetryWorkerBuilder {
let metrics_flush_interval =
telemetry_heartbeat_interval.min(MetricBuckets::METRICS_FLUSH_INTERVAL);

let runtime_id = self
.runtime_id
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let root_session_id = self.root_session_id.unwrap_or_else(|| runtime_id.clone());

#[allow(clippy::unwrap_used)]
let worker = TelemetryWorker {
flavor: self.flavor,
Expand All @@ -1131,9 +1147,8 @@ impl TelemetryWorkerBuilder {
config,
mailbox,
seq_id: AtomicU64::new(1),
runtime_id: self
.runtime_id
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
runtime_id,
root_session_id,
client,
metrics_flush_interval,
deadlines: scheduler::Scheduler::new(vec![
Expand Down
Loading