Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libdd-data-pipeline/src/stats_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ mod tests {
SystemTime::now() - BUCKETS_DURATION * 3,
vec![],
vec![],
vec![],
);
let mut trace = vec![];

Expand Down
1 change: 1 addition & 0 deletions libdd-data-pipeline/src/trace_exporter/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub(crate) fn start_stats_computation(
std::time::SystemTime::now(),
span_kinds,
peer_tags,
vec![], // TODO: Add span-derived primary tags
)));
let cancellation_token = CancellationToken::new();
create_and_start_stats_worker(
Expand Down
1 change: 1 addition & 0 deletions libdd-trace-stats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ let mut concentrator = SpanConcentrator::new(
SystemTime::now(),
vec!["client".to_string(), "server".to_string()], // eligible span kinds
vec!["peer.service".to_string()], // peer tag keys
vec![], // span-derived primary tag keys
);

// Add spans
Expand Down
1 change: 1 addition & 0 deletions libdd-trace-stats/benches/span_concentrator_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
now,
vec![],
vec!["db_name".into(), "bucket_s3".into()],
vec![],
);
let mut spans = vec![];
for trace_id in 1..100 {
Expand Down
105 changes: 100 additions & 5 deletions libdd-trace-stats/src/span_concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub(super) struct BorrowedAggregationKey<'a> {
http_endpoint: &'a str,
grpc_status_code: Option<u8>,
service_source: &'a str,
span_derived_primary_tags: Vec<(&'a str, &'a str)>,
}

impl hashbrown::Equivalent<OwnedAggregationKey> for BorrowedAggregationKey<'_> {
Expand All @@ -59,6 +60,7 @@ impl hashbrown::Equivalent<OwnedAggregationKey> for BorrowedAggregationKey<'_> {
http_endpoint,
grpc_status_code,
service_source,
span_derived_primary_tags,
}: &OwnedAggregationKey,
) -> bool {
self.resource_name == resource_name
Expand All @@ -79,6 +81,12 @@ impl hashbrown::Equivalent<OwnedAggregationKey> for BorrowedAggregationKey<'_> {
&& self.http_endpoint == http_endpoint
&& self.grpc_status_code == *grpc_status_code
&& self.service_source == service_source
&& self.span_derived_primary_tags.len() == span_derived_primary_tags.len()
&& self
.span_derived_primary_tags
.iter()
.zip(span_derived_primary_tags.iter())
.all(|((k1, v1), (k2, v2))| k1 == k2 && v1 == v2)
}
}

Expand All @@ -104,6 +112,7 @@ pub(super) struct OwnedAggregationKey {
http_endpoint: String,
grpc_status_code: Option<u8>,
service_source: String,
span_derived_primary_tags: Vec<(String, String)>,
}

impl From<&BorrowedAggregationKey<'_>> for OwnedAggregationKey {
Expand All @@ -126,6 +135,11 @@ impl From<&BorrowedAggregationKey<'_>> for OwnedAggregationKey {
http_endpoint: value.http_endpoint.to_owned(),
grpc_status_code: value.grpc_status_code,
service_source: value.service_source.to_owned(),
span_derived_primary_tags: value
.span_derived_primary_tags
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
}
}
}
Expand Down Expand Up @@ -208,9 +222,16 @@ fn grpc_status_str_to_int_value(v: &str) -> Option<u8> {
impl<'a> BorrowedAggregationKey<'a> {
/// Return an AggregationKey matching the given span.
///
/// If `peer_tags_keys` is not empty then the peer tags of the span will be included in the
/// If `peer_tag_keys` is not empty then the peer tags of the span will be included in the
/// key.
pub(super) fn from_span<T: StatSpan<'a>>(span: &'a T, peer_tag_keys: &'a [String]) -> Self {
///
/// If `span_derived_primary_tag_keys` is not empty then matching span meta keys are included
/// as `span_derived_primary_tags` in the key (same `key:value` encoding as peer tags).
pub(super) fn from_span<T: StatSpan<'a>>(
span: &'a T,
peer_tag_keys: &'a [String],
span_derived_primary_tag_keys: &'a [String],
) -> Self {
let span_kind = span.get_meta(TAG_SPANKIND).unwrap_or_default();
let peer_tags = if should_track_peer_tags(span_kind) {
// Parse the meta tags of the span and return a list of the peer tags based on the list
Expand Down Expand Up @@ -245,6 +266,11 @@ impl<'a> BorrowedAggregationKey<'a> {

let service_source = span.get_meta(TAG_SVC_SRC).unwrap_or_default();

let span_derived_primary_tags: Vec<(&'a str, &'a str)> = span_derived_primary_tag_keys
.iter()
.filter_map(|key| Some(((key.as_str()), (span.get_meta(key.as_str())?))))
.collect();

Self {
resource_name: span.resource(),
service_name: span.service(),
Expand All @@ -261,6 +287,7 @@ impl<'a> BorrowedAggregationKey<'a> {
http_endpoint,
grpc_status_code,
service_source,
span_derived_primary_tags,
}
}
}
Expand Down Expand Up @@ -288,6 +315,14 @@ impl From<pb::ClientGroupedStats> for OwnedAggregationKey {
http_endpoint: value.http_endpoint,
grpc_status_code: value.grpc_status_code.parse().ok(),
service_source: value.service_source,
span_derived_primary_tags: value
.span_derived_primary_tags
.into_iter()
.filter_map(|t| {
let (key, value) = t.split_once(':')?;
Some((key.to_string(), value.to_string()))
})
.collect(),
}
}
}
Expand Down Expand Up @@ -418,7 +453,11 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl
.map(|c| c.to_string())
.unwrap_or_default(),
service_source: key.service_source,
span_derived_primary_tags: vec![], // Todo
span_derived_primary_tags: key
.span_derived_primary_tags
.into_iter()
.map(|(k, v)| format!("{k}:{v}"))
.collect(),
}
}

Expand Down Expand Up @@ -820,6 +859,51 @@ mod tests {
),
];

let test_primary_tag_keys = vec!["region".to_string(), "env".to_string()];
let test_cases_primary_tags: Vec<(SpanSlice, OwnedAggregationKey)> = vec![
(
SpanSlice {
service: "service",
name: "op",
resource: "res",
span_id: 1,
parent_id: 0,
meta: HashMap::from([("region", "us1"), ("env", "prod")]),
..Default::default()
},
OwnedAggregationKey {
service_name: "service".into(),
operation_name: "op".into(),
resource_name: "res".into(),
is_trace_root: true,
span_derived_primary_tags: vec![
("region".into(), "us1".into()),
("env".into(), "prod".into()),
],
..Default::default()
},
),
(
SpanSlice {
service: "service",
name: "op",
resource: "res",
span_id: 1,
parent_id: 0,
meta: HashMap::from([("region", "us1")]),
..Default::default()
},
OwnedAggregationKey {
service_name: "service".into(),
operation_name: "op".into(),
resource_name: "res".into(),
is_trace_root: true,
span_derived_primary_tags: vec![("region".into(), "us1".into())],
..Default::default()
},
),
];

let test_peer_tags = vec![
"aws.s3.bucket".to_string(),
"db.instance".to_string(),
Expand Down Expand Up @@ -907,7 +991,7 @@ mod tests {
];

for (span, expected_key) in test_cases {
let borrowed_key = BorrowedAggregationKey::from_span(&span, &[]);
let borrowed_key = BorrowedAggregationKey::from_span(&span, &[], &[]);
assert_eq!(
OwnedAggregationKey::from(&borrowed_key),
expected_key,
Expand All @@ -919,8 +1003,19 @@ mod tests {
);
}

for (span, expected_key) in test_cases_primary_tags {
let borrowed_key =
BorrowedAggregationKey::from_span(&span, &[], test_primary_tag_keys.as_slice());
assert_eq!(OwnedAggregationKey::from(&borrowed_key), expected_key);
assert_eq!(
get_hash(&borrowed_key),
get_hash(&OwnedAggregationKey::from(&borrowed_key))
);
}

for (span, expected_key) in test_cases_with_peer_tags {
let borrowed_key = BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice());
let borrowed_key =
BorrowedAggregationKey::from_span(&span, test_peer_tags.as_slice(), &[]);
assert_eq!(OwnedAggregationKey::from(&borrowed_key), expected_key);
assert_eq!(
get_hash(&borrowed_key),
Expand Down
18 changes: 16 additions & 2 deletions libdd-trace-stats/src/span_concentrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,23 @@ pub struct SpanConcentrator {
span_kinds_stats_computed: Vec<String>,
/// keys for supplementary tags that describe peer.service entities
peer_tag_keys: Vec<String>,
/// keys for second primary tags on trace stats
span_derived_primary_tag_keys: Vec<String>,
}

impl SpanConcentrator {
/// Return a new concentrator with the given parameters
/// - `bucket_size` is the size of the time buckets
/// - `now` the current system time, used to define the oldest bucket
/// - `span_kinds_stats_computed` list of span kinds eligible for stats computation
/// - `peer_tags_keys` list of keys considered as peer tags for aggregation
/// - `peer_tag_keys` list of keys considered as peer tags for aggregation
/// - `span_derived_primary_tag_keys` list of keys considered as second primary tags on trace stats
pub fn new(
bucket_size: Duration,
now: SystemTime,
span_kinds_stats_computed: Vec<String>,
peer_tag_keys: Vec<String>,
span_derived_primary_tag_keys: Vec<String>,
) -> SpanConcentrator {
SpanConcentrator {
bucket_size: bucket_size.as_nanos() as u64,
Expand All @@ -90,6 +94,7 @@ impl SpanConcentrator {
buffer_len: 2,
span_kinds_stats_computed,
peer_tag_keys,
span_derived_primary_tag_keys,
}
}

Expand All @@ -103,6 +108,11 @@ impl SpanConcentrator {
self.peer_tag_keys = peer_tags;
}

/// Set the lkeys considered as second primary tags on trace stats
pub fn set_span_derived_primary_tags(&mut self, tag_keys: Vec<String>) {
self.span_derived_primary_tag_keys = tag_keys;
}

/// Return the bucket size used for aggregation
pub fn get_bucket_size(&self) -> Duration {
Duration::from_nanos(self.bucket_size)
Expand All @@ -124,7 +134,11 @@ impl SpanConcentrator {
bucket_timestamp = self.oldest_timestamp;
}

let agg_key = BorrowedAggregationKey::from_span(span, self.peer_tag_keys.as_slice());
let agg_key = BorrowedAggregationKey::from_span(
span,
self.peer_tag_keys.as_slice(),
self.span_derived_primary_tag_keys.as_slice(),
);

self.buckets
.entry(bucket_timestamp)
Expand Down
Loading
Loading