From 49ba1c9d33734ec3bd8cc5e7e57bb63a1f832c49 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Fri, 27 Mar 2026 12:59:56 -0400 Subject: [PATCH] implement span derived primary tags --- libdd-data-pipeline/src/stats_exporter.rs | 1 + .../src/trace_exporter/stats.rs | 1 + libdd-trace-stats/README.md | 1 + .../benches/span_concentrator_bench.rs | 1 + .../src/span_concentrator/aggregation.rs | 105 +++++++++++- .../src/span_concentrator/mod.rs | 18 +- .../src/span_concentrator/tests.rs | 159 +++++++++++++++++- 7 files changed, 271 insertions(+), 15 deletions(-) diff --git a/libdd-data-pipeline/src/stats_exporter.rs b/libdd-data-pipeline/src/stats_exporter.rs index db7cd0cadc..c7cdfe5f56 100644 --- a/libdd-data-pipeline/src/stats_exporter.rs +++ b/libdd-data-pipeline/src/stats_exporter.rs @@ -232,6 +232,7 @@ mod tests { SystemTime::now() - BUCKETS_DURATION * 3, vec![], vec![], + vec![], ); let mut trace = vec![]; diff --git a/libdd-data-pipeline/src/trace_exporter/stats.rs b/libdd-data-pipeline/src/trace_exporter/stats.rs index 943ebc5dd1..ca33925e6e 100644 --- a/libdd-data-pipeline/src/trace_exporter/stats.rs +++ b/libdd-data-pipeline/src/trace_exporter/stats.rs @@ -72,6 +72,7 @@ pub(crate) fn start_stats_computation( std::time::SystemTime::now(), span_kinds, peer_tags, + vec![], // TODO: Add span-derived primary tags ))); let cancellation_token = CancellationToken::new(); create_and_start_stats_worker( diff --git a/libdd-trace-stats/README.md b/libdd-trace-stats/README.md index 1d9ef38a7f..9d87e25fa6 100644 --- a/libdd-trace-stats/README.md +++ b/libdd-trace-stats/README.md @@ -53,6 +53,7 @@ let mut concentrator = SpanConcentrator::new( SystemTime::now(), vec!["client".to_string(), "server".to_string()], // eligible span kinds vec!["peer.service".to_string()], // peer tag keys + vec![], // span-derived primary tag keys ); // Add spans diff --git a/libdd-trace-stats/benches/span_concentrator_bench.rs b/libdd-trace-stats/benches/span_concentrator_bench.rs index 21c105c952..49c4cd8bae 100644 --- a/libdd-trace-stats/benches/span_concentrator_bench.rs +++ b/libdd-trace-stats/benches/span_concentrator_bench.rs @@ -47,6 +47,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { now, vec![], vec!["db_name".into(), "bucket_s3".into()], + vec![], ); let mut spans = vec![]; for trace_id in 1..100 { diff --git a/libdd-trace-stats/src/span_concentrator/aggregation.rs b/libdd-trace-stats/src/span_concentrator/aggregation.rs index f60113a6ac..fac4dacef7 100644 --- a/libdd-trace-stats/src/span_concentrator/aggregation.rs +++ b/libdd-trace-stats/src/span_concentrator/aggregation.rs @@ -39,6 +39,7 @@ pub(super) struct BorrowedAggregationKey<'a> { http_endpoint: &'a str, grpc_status_code: Option, service_source: &'a str, + span_derived_primary_tags: Vec<(&'a str, &'a str)>, } impl hashbrown::Equivalent for BorrowedAggregationKey<'_> { @@ -59,6 +60,7 @@ impl hashbrown::Equivalent for BorrowedAggregationKey<'_> { http_endpoint, grpc_status_code, service_source, + span_derived_primary_tags, }: &OwnedAggregationKey, ) -> bool { self.resource_name == resource_name @@ -79,6 +81,12 @@ impl hashbrown::Equivalent for BorrowedAggregationKey<'_> { && self.http_endpoint == http_endpoint && self.grpc_status_code == *grpc_status_code && self.service_source == service_source + && self.span_derived_primary_tags.len() == span_derived_primary_tags.len() + && self + .span_derived_primary_tags + .iter() + .zip(span_derived_primary_tags.iter()) + .all(|((k1, v1), (k2, v2))| k1 == k2 && v1 == v2) } } @@ -104,6 +112,7 @@ pub(super) struct OwnedAggregationKey { http_endpoint: String, grpc_status_code: Option, service_source: String, + span_derived_primary_tags: Vec<(String, String)>, } impl From<&BorrowedAggregationKey<'_>> for OwnedAggregationKey { @@ -126,6 +135,11 @@ impl From<&BorrowedAggregationKey<'_>> for OwnedAggregationKey { http_endpoint: value.http_endpoint.to_owned(), grpc_status_code: value.grpc_status_code, service_source: value.service_source.to_owned(), + span_derived_primary_tags: value + .span_derived_primary_tags + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), } } } @@ -208,9 +222,16 @@ fn grpc_status_str_to_int_value(v: &str) -> Option { impl<'a> BorrowedAggregationKey<'a> { /// Return an AggregationKey matching the given span. /// - /// If `peer_tags_keys` is not empty then the peer tags of the span will be included in the + /// If `peer_tag_keys` is not empty then the peer tags of the span will be included in the /// key. - pub(super) fn from_span>(span: &'a T, peer_tag_keys: &'a [String]) -> Self { + /// + /// If `span_derived_primary_tag_keys` is not empty then matching span meta keys are included + /// as `span_derived_primary_tags` in the key (same `key:value` encoding as peer tags). + pub(super) fn from_span>( + span: &'a T, + peer_tag_keys: &'a [String], + span_derived_primary_tag_keys: &'a [String], + ) -> Self { let span_kind = span.get_meta(TAG_SPANKIND).unwrap_or_default(); let peer_tags = if should_track_peer_tags(span_kind) { // Parse the meta tags of the span and return a list of the peer tags based on the list @@ -245,6 +266,11 @@ impl<'a> BorrowedAggregationKey<'a> { let service_source = span.get_meta(TAG_SVC_SRC).unwrap_or_default(); + let span_derived_primary_tags: Vec<(&'a str, &'a str)> = span_derived_primary_tag_keys + .iter() + .filter_map(|key| Some(((key.as_str()), (span.get_meta(key.as_str())?)))) + .collect(); + Self { resource_name: span.resource(), service_name: span.service(), @@ -261,6 +287,7 @@ impl<'a> BorrowedAggregationKey<'a> { http_endpoint, grpc_status_code, service_source, + span_derived_primary_tags, } } } @@ -288,6 +315,14 @@ impl From for OwnedAggregationKey { http_endpoint: value.http_endpoint, grpc_status_code: value.grpc_status_code.parse().ok(), service_source: value.service_source, + span_derived_primary_tags: value + .span_derived_primary_tags + .into_iter() + .filter_map(|t| { + let (key, value) = t.split_once(':')?; + Some((key.to_string(), value.to_string())) + }) + .collect(), } } } @@ -418,7 +453,11 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl .map(|c| c.to_string()) .unwrap_or_default(), service_source: key.service_source, - span_derived_primary_tags: vec![], // Todo + span_derived_primary_tags: key + .span_derived_primary_tags + .into_iter() + .map(|(k, v)| format!("{k}:{v}")) + .collect(), } } @@ -820,6 +859,51 @@ mod tests { ), ]; + let test_primary_tag_keys = vec!["region".to_string(), "env".to_string()]; + let test_cases_primary_tags: Vec<(SpanSlice, OwnedAggregationKey)> = vec![ + ( + SpanSlice { + service: "service", + name: "op", + resource: "res", + span_id: 1, + parent_id: 0, + meta: HashMap::from([("region", "us1"), ("env", "prod")]), + ..Default::default() + }, + OwnedAggregationKey { + service_name: "service".into(), + operation_name: "op".into(), + resource_name: "res".into(), + is_trace_root: true, + span_derived_primary_tags: vec![ + ("region".into(), "us1".into()), + ("env".into(), "prod".into()), + ], + ..Default::default() + }, + ), + ( + SpanSlice { + service: "service", + name: "op", + resource: "res", + span_id: 1, + parent_id: 0, + meta: HashMap::from([("region", "us1")]), + ..Default::default() + }, + OwnedAggregationKey { + service_name: "service".into(), + operation_name: "op".into(), + resource_name: "res".into(), + is_trace_root: true, + span_derived_primary_tags: vec![("region".into(), "us1".into())], + ..Default::default() + }, + ), + ]; + let test_peer_tags = vec![ "aws.s3.bucket".to_string(), "db.instance".to_string(), @@ -907,7 +991,7 @@ mod tests { ]; for (span, expected_key) in test_cases { - let borrowed_key = BorrowedAggregationKey::from_span(&span, &[]); + let borrowed_key = BorrowedAggregationKey::from_span(&span, &[], &[]); assert_eq!( OwnedAggregationKey::from(&borrowed_key), expected_key, @@ -919,8 +1003,19 @@ mod tests { ); } + for (span, expected_key) in test_cases_primary_tags { + let borrowed_key = + BorrowedAggregationKey::from_span(&span, &[], test_primary_tag_keys.as_slice()); + assert_eq!(OwnedAggregationKey::from(&borrowed_key), expected_key); + assert_eq!( + get_hash(&borrowed_key), + get_hash(&OwnedAggregationKey::from(&borrowed_key)) + ); + } + for (span, expected_key) in test_cases_with_peer_tags { - let borrowed_key = BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice()); + let borrowed_key = + BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice(), &[]); assert_eq!(OwnedAggregationKey::from(&borrowed_key), expected_key); assert_eq!( get_hash(&borrowed_key), diff --git a/libdd-trace-stats/src/span_concentrator/mod.rs b/libdd-trace-stats/src/span_concentrator/mod.rs index 1ad0ba8b24..97605cc7e4 100644 --- a/libdd-trace-stats/src/span_concentrator/mod.rs +++ b/libdd-trace-stats/src/span_concentrator/mod.rs @@ -66,6 +66,8 @@ pub struct SpanConcentrator { span_kinds_stats_computed: Vec, /// keys for supplementary tags that describe peer.service entities peer_tag_keys: Vec, + /// keys for second primary tags on trace stats + span_derived_primary_tag_keys: Vec, } impl SpanConcentrator { @@ -73,12 +75,14 @@ impl SpanConcentrator { /// - `bucket_size` is the size of the time buckets /// - `now` the current system time, used to define the oldest bucket /// - `span_kinds_stats_computed` list of span kinds eligible for stats computation - /// - `peer_tags_keys` list of keys considered as peer tags for aggregation + /// - `peer_tag_keys` list of keys considered as peer tags for aggregation + /// - `span_derived_primary_tag_keys` list of keys considered as second primary tags on trace stats pub fn new( bucket_size: Duration, now: SystemTime, span_kinds_stats_computed: Vec, peer_tag_keys: Vec, + span_derived_primary_tag_keys: Vec, ) -> SpanConcentrator { SpanConcentrator { bucket_size: bucket_size.as_nanos() as u64, @@ -90,6 +94,7 @@ impl SpanConcentrator { buffer_len: 2, span_kinds_stats_computed, peer_tag_keys, + span_derived_primary_tag_keys, } } @@ -103,6 +108,11 @@ impl SpanConcentrator { self.peer_tag_keys = peer_tags; } + /// Set the lkeys considered as second primary tags on trace stats + pub fn set_span_derived_primary_tags(&mut self, tag_keys: Vec) { + self.span_derived_primary_tag_keys = tag_keys; + } + /// Return the bucket size used for aggregation pub fn get_bucket_size(&self) -> Duration { Duration::from_nanos(self.bucket_size) @@ -124,7 +134,11 @@ impl SpanConcentrator { bucket_timestamp = self.oldest_timestamp; } - let agg_key = BorrowedAggregationKey::from_span(span, self.peer_tag_keys.as_slice()); + let agg_key = BorrowedAggregationKey::from_span( + span, + self.peer_tag_keys.as_slice(), + self.span_derived_primary_tag_keys.as_slice(), + ); self.buckets .entry(bucket_timestamp) diff --git a/libdd-trace-stats/src/span_concentrator/tests.rs b/libdd-trace-stats/src/span_concentrator/tests.rs index 0a45bb3151..2fc234d3ee 100644 --- a/libdd-trace-stats/src/span_concentrator/tests.rs +++ b/libdd-trace-stats/src/span_concentrator/tests.rs @@ -99,8 +99,13 @@ fn assert_counts_equal(expected: Vec, actual: Vec