diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 532d206d7..e4a770944 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -1134,9 +1134,10 @@ fn start_trace_agent( obfuscation_redis_remove_all_args: false, }; - let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor { - obfuscation_config: Arc::new(obfuscation_config), - }); + let trace_processor = Arc::new(trace_processor::ServerlessTraceProcessor::new( + config.as_ref(), + obfuscation_config, + )); let (span_dedup_service, span_dedup_handle) = span_dedup_service::DedupService::new(); tokio::spawn(span_dedup_service.run()); diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index 4c056d32b..9e663134e 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -226,7 +226,7 @@ pub struct EnvConfig { /// @env `DD_APM_FILTER_TAGS_REQUIRE` /// /// Space-separated list of key:value tag pairs that spans must match to be kept. - /// Only spans matching at least one of these tags will be sent to Datadog. + /// Only spans matching all of these tags will be sent to Datadog. /// Example: "env:production service:api-gateway" #[serde(deserialize_with = "deserialize_apm_filter_tags")] pub apm_filter_tags_require: Option>, @@ -240,7 +240,7 @@ pub struct EnvConfig { /// @env `DD_APM_FILTER_TAGS_REGEX_REQUIRE` /// /// Space-separated list of key:value tag pairs with regex values that spans must match to be kept. - /// Only spans matching at least one of these regex patterns will be sent to Datadog. + /// Only spans matching all of these regex patterns will be sent to Datadog. /// Example: "env:^prod.*$ service:^api-.*$" #[serde(deserialize_with = "deserialize_apm_filter_tags")] pub apm_filter_tags_regex_require: Option>, diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index b38ada8ca..11693ecfd 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -1671,9 +1671,11 @@ mod tests { // Create trace sender let trace_sender = Arc::new(SendingTraceProcessor { appsec: None, - processor: Arc::new(trace_processor::ServerlessTraceProcessor { - obfuscation_config: Arc::new(ObfuscationConfig::new().expect("Failed to create ObfuscationConfig")), - }), + processor: Arc::new(trace_processor::ServerlessTraceProcessor::new( + config.as_ref(), + ObfuscationConfig::new() + .expect("Failed to create ObfuscationConfig"), + )), trace_tx: tokio::sync::mpsc::channel(1).0, stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)), }); @@ -1779,11 +1781,10 @@ mod tests { tokio::spawn(stats_concentrator_service.run()); let trace_sender = Arc::new(SendingTraceProcessor { appsec: None, - processor: Arc::new(trace_processor::ServerlessTraceProcessor { - obfuscation_config: Arc::new( - ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), - ), - }), + processor: Arc::new(trace_processor::ServerlessTraceProcessor::new( + config.as_ref(), + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + )), trace_tx: tokio::sync::mpsc::channel(1).0, stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)), }); @@ -2174,11 +2175,10 @@ mod tests { tokio::spawn(stats_concentrator_service.run()); let trace_sender = Arc::new(SendingTraceProcessor { appsec: None, - processor: Arc::new(trace_processor::ServerlessTraceProcessor { - obfuscation_config: Arc::new( - ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), - ), - }), + processor: Arc::new(trace_processor::ServerlessTraceProcessor::new( + config.as_ref(), + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + )), trace_tx, stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)), }); diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 3864e4c60..6a60e26c4 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -30,7 +30,7 @@ use std::sync::Arc; use tokio::sync::Mutex; use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::error::SendError; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use crate::traces::stats_generator::StatsGenerator; use crate::traces::trace_aggregator::{OwnedTracerHeaderTags, SendDataBuilderInfo}; @@ -40,19 +40,135 @@ use libdd_trace_normalization::normalizer::SamplerPriority; #[allow(clippy::module_name_repetitions)] pub struct ServerlessTraceProcessor { pub obfuscation_config: Arc, + tag_filters: Arc, +} + +#[derive(Clone, Debug)] +struct ExactFilter { + key: String, + value: Option, +} + +#[derive(Clone, Debug)] +struct RegexFilter { + key: String, + regex: Option, +} + +#[derive(Clone, Debug, Default)] +struct TagFilters { + require: Vec, + reject: Vec, + require_regex: Vec, + reject_regex: Vec, +} + +impl TagFilters { + fn from_config(config: &config::Config) -> Self { + Self { + require: compile_exact_filters(config.apm_filter_tags_require.as_deref()), + reject: compile_exact_filters(config.apm_filter_tags_reject.as_deref()), + require_regex: compile_regex_filters(config.apm_filter_tags_regex_require.as_deref()), + reject_regex: compile_regex_filters(config.apm_filter_tags_regex_reject.as_deref()), + } + } } struct ChunkProcessor { - config: Arc, obfuscation_config: Arc, tags_provider: Arc, span_pointers: Option>, + tag_filters: Arc, +} + +impl ServerlessTraceProcessor { + #[must_use] + pub fn new( + config: &config::Config, + obfuscation_config: obfuscation_config::ObfuscationConfig, + ) -> Self { + Self { + obfuscation_config: Arc::new(obfuscation_config), + tag_filters: Arc::new(TagFilters::from_config(config)), + } + } +} + +fn compile_exact_filters(filters: Option<&[String]>) -> Vec { + filters + .into_iter() + .flatten() + .map(|filter| match filter.split_once(':') { + Some((key, value)) => ExactFilter { + key: key.trim().to_string(), + value: Some(value.trim().to_string()), + }, + None => ExactFilter { + key: filter.trim().to_string(), + value: None, + }, + }) + .collect() +} + +fn compile_regex_filters(filters: Option<&[String]>) -> Vec { + filters + .into_iter() + .flatten() + .filter_map(|filter| match filter.split_once(':') { + Some((key, pattern)) => if let Ok(regex) = Regex::new(pattern.trim()) { + Some(RegexFilter { + key: key.trim().to_string(), + regex: Some(regex), + }) + } else { + warn!( + "TRACE_PROCESSOR | Invalid regex pattern '{}' for key '{}', skipping filter", + pattern.trim(), + key.trim() + ); + None + }, + None => Some(RegexFilter { + key: filter.trim().to_string(), + regex: None, + }), + }) + .collect() +} + +fn format_exact_filter(filter: &ExactFilter) -> String { + match &filter.value { + None => filter.key.clone(), + Some(value) => format!("{}:{}", filter.key, value), + } +} + +fn format_regex_filter(filter: &RegexFilter) -> String { + match &filter.regex { + None => filter.key.clone(), + Some(regex) => format!("{}:{}", filter.key, regex.as_str()), + } +} + +fn span_matches_exact_filter(span: &Span, filter: &ExactFilter) -> bool { + match &filter.value { + None => span_has_key(span, &filter.key), + Some(value) => span_matches_tag_exact(span, &filter.key, value), + } +} + +fn span_matches_regex_filter(span: &Span, filter: &RegexFilter) -> bool { + match &filter.regex { + None => span_has_key(span, &filter.key), + Some(regex) => span_matches_tag_regex(span, &filter.key, regex), + } } impl TraceChunkProcessor for ChunkProcessor { fn process(&mut self, chunk: &mut pb::TraceChunk, root_span_index: usize) { if let Some(root_span) = chunk.spans.get(root_span_index) - && filter_span_by_tags(root_span, &self.config) + && filter_span_by_tags(root_span, &self.tag_filters) { chunk.spans.clear(); return; @@ -89,70 +205,86 @@ impl TraceChunkProcessor for ChunkProcessor { } } -fn filter_span_by_tags(span: &Span, config: &config::Config) -> bool { +fn filter_span_by_tags(span: &Span, tag_filters: &TagFilters) -> bool { // Handle required tags from DD_APM_FILTER_TAGS_REQUIRE (exact match) - if let Some(require_tags) = &config.apm_filter_tags_require - && !require_tags.is_empty() - { - let matches_require = require_tags + if !tag_filters.require.is_empty() { + let matches_require = tag_filters + .require .iter() - .all(|filter| span_matches_tag_exact_filter(span, filter)); + .all(|filter| span_matches_exact_filter(span, filter)); if !matches_require { debug!( "TRACE_PROCESSOR | Filtering out span '{}' - doesn't match all required tags {}", span.name, - require_tags.join(", ") + tag_filters + .require + .iter() + .map(format_exact_filter) + .collect::>() + .join(", ") ); return true; } } // Handle required regex tags from DD_APM_FILTER_TAGS_REGEX_REQUIRE (regex match) - if let Some(require_regex_tags) = &config.apm_filter_tags_regex_require - && !require_regex_tags.is_empty() - { - let matches_require_regex = require_regex_tags + if !tag_filters.require_regex.is_empty() { + let matches_require_regex = tag_filters + .require_regex .iter() - .all(|filter| span_matches_tag_regex_filter(span, filter)); + .all(|filter| span_matches_regex_filter(span, filter)); if !matches_require_regex { debug!( "TRACE_PROCESSOR | Filtering out span '{}' - doesn't match all required regex tags {}", span.name, - require_regex_tags.join(", ") + tag_filters + .require_regex + .iter() + .map(format_regex_filter) + .collect::>() + .join(", ") ); return true; } } // Handle reject tags from DD_APM_FILTER_TAGS_REJECT (exact match) - if let Some(reject_tags) = &config.apm_filter_tags_reject - && !reject_tags.is_empty() - { - let matches_reject = reject_tags + if !tag_filters.reject.is_empty() { + let matches_reject = tag_filters + .reject .iter() - .any(|filter| span_matches_tag_exact_filter(span, filter)); + .any(|filter| span_matches_exact_filter(span, filter)); if matches_reject { debug!( "TRACE_PROCESSOR | Filtering out span '{}' - matches reject tags {}", span.name, - reject_tags.join(", ") + tag_filters + .reject + .iter() + .map(format_exact_filter) + .collect::>() + .join(", ") ); return true; } } // Handle reject regex tags from DD_APM_FILTER_TAGS_REGEX_REJECT (regex match) - if let Some(reject_regex_tags) = &config.apm_filter_tags_regex_reject - && !reject_regex_tags.is_empty() - { - let matches_reject_regex = reject_regex_tags + if !tag_filters.reject_regex.is_empty() { + let matches_reject_regex = tag_filters + .reject_regex .iter() - .any(|filter| span_matches_tag_regex_filter(span, filter)); + .any(|filter| span_matches_regex_filter(span, filter)); if matches_reject_regex { debug!( "TRACE_PROCESSOR | Filtering out span '{}' - matches reject regex tags {}", span.name, - reject_regex_tags.join(", ") + tag_filters + .reject_regex + .iter() + .map(format_regex_filter) + .collect::>() + .join(", ") ); return true; } @@ -161,36 +293,6 @@ fn filter_span_by_tags(span: &Span, config: &config::Config) -> bool { false } -fn span_matches_tag_exact_filter(span: &Span, filter: &str) -> bool { - let parts: Vec<&str> = filter.splitn(2, ':').collect(); - - if parts.len() == 2 { - let key = parts[0].trim(); - let value = parts[1].trim(); - span_matches_tag_exact(span, key, value) - } else if parts.len() == 1 { - let key = parts[0].trim(); - span_has_key(span, key) - } else { - false - } -} - -fn span_matches_tag_regex_filter(span: &Span, filter: &str) -> bool { - let parts: Vec<&str> = filter.splitn(2, ':').collect(); - - if parts.len() == 2 { - let key = parts[0].trim(); - let value = parts[1].trim(); - span_matches_tag_regex(span, key, value) - } else if parts.len() == 1 { - let key = parts[0].trim(); - span_has_key(span, key) - } else { - false - } -} - fn span_matches_tag_exact(span: &Span, key: &str, value: &str) -> bool { if let Some(span_value) = span.meta.get(key) && span_value == value @@ -215,15 +317,7 @@ fn span_matches_tag_exact(span: &Span, key: &str, value: &str) -> bool { false } -fn span_matches_tag_regex(span: &Span, key: &str, value: &str) -> bool { - let Ok(regex) = Regex::new(value) else { - debug!( - "TRACE_PROCESSOR | Invalid regex pattern '{}' for key '{}', treating as non-match", - value, key - ); - return false; - }; - +fn span_matches_tag_regex(span: &Span, key: &str, regex: &Regex) -> bool { if let Some(span_value) = span.meta.get(key) && regex.is_match(span_value) { @@ -334,10 +428,10 @@ impl TraceProcessor for ServerlessTraceProcessor { traces, &header_tags, &mut ChunkProcessor { - config: config.clone(), obfuscation_config: self.obfuscation_config.clone(), tags_provider: tags_provider.clone(), span_pointers, + tag_filters: self.tag_filters.clone(), }, true, // send agentless since we are the agent ) @@ -657,12 +751,11 @@ mod tests { dropped_p0_spans: 0, }; - let trace_processor = ServerlessTraceProcessor { - obfuscation_config: Arc::new( - ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), - ), - }; let config = create_test_config(); + let trace_processor = ServerlessTraceProcessor::new( + config.as_ref(), + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + ); let tags_provider = create_tags_provider(config.clone()); let (tracer_payload, _processed_traces) = trace_processor.process_traces( config, @@ -744,15 +837,37 @@ mod tests { ..Default::default() }; - assert!(span_matches_tag_regex(&span, "env", r"prod.*")); - assert!(span_matches_tag_regex(&span, "env", "")); - assert!(!span_matches_tag_regex(&span, "env", r"dev.*")); - - assert!(span_matches_tag_regex(&span, "service", r".*-service")); - assert!(span_matches_tag_regex(&span, "service", "")); - assert!(!span_matches_tag_regex(&span, "service", r"api-.*")); + assert!(span_matches_tag_regex( + &span, + "env", + &Regex::new(r"prod.*").expect("regex pattern should be valid") + )); + assert!(span_matches_tag_regex( + &span, + "env", + &Regex::new("").expect("regex pattern should be valid") + )); + assert!(!span_matches_tag_regex( + &span, + "env", + &Regex::new(r"dev.*").expect("regex pattern should be valid") + )); - assert!(!span_matches_tag_regex(&span, "env", r"[unclosed")); + assert!(span_matches_tag_regex( + &span, + "service", + &Regex::new(r".*-service").expect("regex pattern should be valid") + )); + assert!(span_matches_tag_regex( + &span, + "service", + &Regex::new("").expect("regex pattern should be valid") + )); + assert!(!span_matches_tag_regex( + &span, + "service", + &Regex::new(r"api-.*").expect("regex pattern should be valid") + )); } #[test] @@ -770,8 +885,16 @@ mod tests { assert!(span_matches_tag_exact(&span, "env", "test-environment")); assert!(!span_matches_tag_exact(&span, "env", "test")); - assert!(span_matches_tag_regex(&span, "env", r"test.*")); - assert!(!span_matches_tag_regex(&span, "env", r"prod.*")); + assert!(span_matches_tag_regex( + &span, + "env", + &Regex::new(r"test.*").expect("regex pattern should be valid") + )); + assert!(!span_matches_tag_regex( + &span, + "env", + &Regex::new(r"prod.*").expect("regex pattern should be valid") + )); } #[test] @@ -792,21 +915,30 @@ mod tests { span_match_all .meta .insert("service".to_string(), "api".to_string()); - assert!(!filter_span_by_tags(&span_match_all, &config)); + assert!(!filter_span_by_tags( + &span_match_all, + &TagFilters::from_config(&config) + )); // Span that matches only one of the require tags let mut span_partial_match = Span::default(); span_partial_match .meta .insert("env".to_string(), "production".to_string()); - assert!(filter_span_by_tags(&span_partial_match, &config)); + assert!(filter_span_by_tags( + &span_partial_match, + &TagFilters::from_config(&config) + )); // Span that doesn't match any require tags let mut span_no_match = Span::default(); span_no_match .meta .insert("env".to_string(), "development".to_string()); - assert!(filter_span_by_tags(&span_no_match, &config)); + assert!(filter_span_by_tags( + &span_no_match, + &TagFilters::from_config(&config) + )); } #[test] @@ -824,14 +956,20 @@ mod tests { span_reject .meta .insert("env".to_string(), "production".to_string()); - assert!(filter_span_by_tags(&span_reject, &config)); + assert!(filter_span_by_tags( + &span_reject, + &TagFilters::from_config(&config) + )); // Span that doesn't match any reject tags let mut span_keep = Span::default(); span_keep .meta .insert("env".to_string(), "production".to_string()); - assert!(!filter_span_by_tags(&span_keep, &config)); + assert!(!filter_span_by_tags( + &span_keep, + &TagFilters::from_config(&config) + )); } #[test] @@ -850,14 +988,38 @@ mod tests { span_both .meta .insert("debug".to_string(), "true".to_string()); - assert!(filter_span_by_tags(&span_both, &config)); + assert!(filter_span_by_tags( + &span_both, + &TagFilters::from_config(&config) + )); // Span that matches require and doesn't match reject let mut span_good = Span::default(); span_good .meta .insert("env".to_string(), "production".to_string()); - assert!(!filter_span_by_tags(&span_good, &config)); + assert!(!filter_span_by_tags( + &span_good, + &TagFilters::from_config(&config) + )); + } + + #[test] + fn test_compile_regex_filters() { + let filters = vec![ + "env".to_string(), + "service:^api-.*$".to_string(), + "debug:[unclosed".to_string(), + ]; + let compiled = compile_regex_filters(Some(filters.as_slice())); + + assert_eq!(compiled.len(), 2); + assert!(matches!(&compiled[0], RegexFilter { key, regex: None } if key == "env")); + assert!(matches!( + &compiled[1], + RegexFilter { key, regex: Some(r) } + if key == "service" && r.as_str() == "^api-.*$" + )); } #[test] @@ -920,12 +1082,11 @@ mod tests { }); let mut processor = ChunkProcessor { - config: config.clone(), obfuscation_config: Arc::new( ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), ), tags_provider: Arc::new(Provider::new( - config, + config.clone(), "lambda".to_string(), &std::collections::HashMap::from([( "function_arn".to_string(), @@ -933,6 +1094,7 @@ mod tests { )]), )), span_pointers: None, + tag_filters: Arc::new(TagFilters::from_config(config.as_ref())), }; processor.process(&mut chunk, 0); @@ -1004,12 +1166,11 @@ mod tests { }); let mut processor = ChunkProcessor { - config: config.clone(), obfuscation_config: Arc::new( ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), ), tags_provider: Arc::new(Provider::new( - config, + config.clone(), "lambda".to_string(), &std::collections::HashMap::from([( "function_arn".to_string(), @@ -1017,6 +1178,7 @@ mod tests { )]), )), span_pointers: None, + tag_filters: Arc::new(TagFilters::from_config(config.as_ref())), }; processor.process(&mut chunk, 0); @@ -1087,12 +1249,11 @@ mod tests { }); let mut processor = ChunkProcessor { - config: config.clone(), obfuscation_config: Arc::new( ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), ), tags_provider: Arc::new(Provider::new( - config, + config.clone(), "lambda".to_string(), &std::collections::HashMap::from([( "function_arn".to_string(), @@ -1100,6 +1261,7 @@ mod tests { )]), )), span_pointers: None, + tag_filters: Arc::new(TagFilters::from_config(config.as_ref())), }; processor.process(&mut chunk, 0); @@ -1132,11 +1294,10 @@ mod tests { "test-arn".to_string(), )]), )); - let processor = ServerlessTraceProcessor { - obfuscation_config: Arc::new( - ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), - ), - }; + let processor = ServerlessTraceProcessor::new( + config.as_ref(), + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + ); let header_tags = tracer_header_tags::TracerHeaderTags { lang: "rust", @@ -1226,11 +1387,10 @@ mod tests { "test-arn".to_string(), )]), )); - let processor = ServerlessTraceProcessor { - obfuscation_config: Arc::new( - ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), - ), - }; + let processor = ServerlessTraceProcessor::new( + config.as_ref(), + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + ); let header_tags = tracer_header_tags::TracerHeaderTags { lang: "rust", lang_version: "1.0", @@ -1304,11 +1464,10 @@ mod tests { "test-arn".to_string(), )]), )); - let processor = ServerlessTraceProcessor { - obfuscation_config: Arc::new( - ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), - ), - }; + let processor = ServerlessTraceProcessor::new( + config.as_ref(), + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + ); let header_tags = tracer_header_tags::TracerHeaderTags { lang: "rust", lang_version: "1.0", @@ -1436,11 +1595,10 @@ mod tests { "test-arn".to_string(), )]), )); - let processor = Arc::new(ServerlessTraceProcessor { - obfuscation_config: Arc::new( - ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), - ), - }); + let processor = Arc::new(ServerlessTraceProcessor::new( + config.as_ref(), + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + )); let header_tags = tracer_header_tags::TracerHeaderTags { lang: "rust", lang_version: "1.0", @@ -1484,7 +1642,7 @@ mod tests { None, ) .await - .unwrap(); + .expect("send_processed_traces should not fail in tests"); if expect_stats { assert!(