From 2da582ef9fa7ce20616f80fb8b5eb8fb02a21b54 Mon Sep 17 00:00:00 2001 From: Alessandro Pogliaghi Date: Tue, 10 Feb 2026 13:14:47 +0000 Subject: [PATCH] =?UTF-8?q?feat(capture-logs):=20server-side=20token?= =?UTF-8?q?=E2=86=92team=5Fid=20resolution?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/Cargo.lock | 3 ++ rust/capture-logs/Cargo.toml | 3 ++ rust/capture-logs/src/config.rs | 9 ++++ rust/capture-logs/src/endpoints/datadog.rs | 11 ++++- rust/capture-logs/src/kafka.rs | 5 ++ rust/capture-logs/src/lib.rs | 1 + rust/capture-logs/src/main.rs | 28 ++++++++++- rust/capture-logs/src/service.rs | 15 +++++- rust/capture-logs/src/team_resolver.rs | 54 ++++++++++++++++++++++ 9 files changed, 126 insertions(+), 3 deletions(-) create mode 100644 rust/capture-logs/src/team_resolver.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 838e807bf3e8..c29008d11098 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1679,6 +1679,7 @@ dependencies = [ "clickhouse", "common-alloc", "common-continuous-profiling", + "common-database", "common-metrics", "envconfig", "health", @@ -1686,12 +1687,14 @@ dependencies = [ "jsonwebtoken", "limiters", "metrics", + "moka", "opentelemetry-proto 0.29.0", "prost 0.13.5", "rdkafka", "serde", "serde_derive", "serde_json", + "sqlx", "thiserror 2.0.17", "tokio", "tonic 0.12.3", diff --git a/rust/capture-logs/Cargo.toml b/rust/capture-logs/Cargo.toml index 57fa6feaffe1..f0998edb9805 100644 --- a/rust/capture-logs/Cargo.toml +++ b/rust/capture-logs/Cargo.toml @@ -35,6 +35,9 @@ prost = { workspace = true } bytes = { workspace = true } tower-http = { workspace = true } hex = "0.4" +common-database = { path = "../common/database" } +sqlx = { workspace = true } +moka = { workspace = true } [lints] workspace = true diff --git a/rust/capture-logs/src/config.rs b/rust/capture-logs/src/config.rs index a02db1e147ba..36ee43ac5fcf 100644 --- a/rust/capture-logs/src/config.rs +++ b/rust/capture-logs/src/config.rs @@ -28,6 +28,15 @@ pub struct Config { #[envconfig(from = "MAX_REQUEST_BODY_SIZE_BYTES", default = "2097152")] // 2MB (Axum default) pub max_request_body_size_bytes: usize, + + #[envconfig(from = "DATABASE_URL", default = "")] + pub database_url: String, + + #[envconfig(from = "TEAM_RESOLVER_CACHE_TTL_SECS", default = "300")] + pub team_resolver_cache_ttl_secs: u64, + + #[envconfig(from = "TEAM_RESOLVER_MAX_POOL_SIZE", default = "5")] + pub team_resolver_max_pool_size: u32, } impl Config { diff --git a/rust/capture-logs/src/endpoints/datadog.rs b/rust/capture-logs/src/endpoints/datadog.rs index a5e598b736f6..bb2aec26842a 100644 --- a/rust/capture-logs/src/endpoints/datadog.rs +++ b/rust/capture-logs/src/endpoints/datadog.rs @@ -289,8 +289,17 @@ pub async fn export_datadog_logs_http( .map(|log| datadog_log_to_kafka_row(log, &query_params)) .collect(); + let team_id = match &service.team_resolver { + Some(resolver) => resolver.resolve(&token).await, + None => None, + }; + let row_count = rows.len(); - if let Err(e) = service.sink.write(&token, rows, body.len() as u64).await { + if let Err(e) = service + .sink + .write(&token, rows, body.len() as u64, team_id) + .await + { error!("Failed to send logs to Kafka: {}", e); return Err(( StatusCode::INTERNAL_SERVER_ERROR, diff --git a/rust/capture-logs/src/kafka.rs b/rust/capture-logs/src/kafka.rs index 78f504c4f745..d5ed15381562 100644 --- a/rust/capture-logs/src/kafka.rs +++ b/rust/capture-logs/src/kafka.rs @@ -194,6 +194,7 @@ impl KafkaSink { token: &str, rows: Vec, uncompressed_bytes: u64, + team_id: Option, ) -> Result<(), anyhow::Error> { if rows.is_empty() { return Ok(()); @@ -245,6 +246,10 @@ impl KafkaSink { key: "batch_uuid", value: Some(&uuid::Uuid::new_v4().to_string()), }) + .insert(Header { + key: "team_id", + value: team_id.map(|id| id.to_string()).as_deref(), + }) }), }) { Err((err, _)) => Err(anyhow!(format!("kafka error: {err}"))), diff --git a/rust/capture-logs/src/lib.rs b/rust/capture-logs/src/lib.rs index daa7600a5303..8b29bb6ab3ee 100644 --- a/rust/capture-logs/src/lib.rs +++ b/rust/capture-logs/src/lib.rs @@ -5,3 +5,4 @@ pub mod endpoints; pub mod kafka; pub mod log_record; pub mod service; +pub mod team_resolver; diff --git a/rust/capture-logs/src/main.rs b/rust/capture-logs/src/main.rs index 6a93b345a655..a5813b6c723f 100644 --- a/rust/capture-logs/src/main.rs +++ b/rust/capture-logs/src/main.rs @@ -8,6 +8,7 @@ use capture_logs::endpoints::datadog; use capture_logs::kafka::KafkaSink; use capture_logs::service::Service; use capture_logs::service::{export_logs_http, options_handler}; +use capture_logs::team_resolver::TeamResolver; use common_metrics::setup_metrics_routes; use std::future::ready; use std::net::SocketAddr; @@ -104,7 +105,32 @@ async fn main() { let token_dropper = TokenDropper::new(&config.drop_events_by_token.unwrap_or_default()); let token_dropper_arc = Arc::new(token_dropper); - let logs_service = match Service::new(kafka_sink, token_dropper_arc).await { + + let team_resolver = if !config.database_url.is_empty() { + let pool = common_database::get_pool_with_config( + &config.database_url, + common_database::PoolConfig { + max_connections: config.team_resolver_max_pool_size, + acquire_timeout: Duration::from_secs(2), + statement_timeout_ms: Some(1000), + ..Default::default() + }, + ) + .expect("Failed to create team resolver DB pool"); + info!( + "Team resolver enabled with DB pool (max_connections={})", + config.team_resolver_max_pool_size + ); + Some(Arc::new(TeamResolver::new( + pool, + config.team_resolver_cache_ttl_secs, + ))) + } else { + info!("DATABASE_URL not set, team_id resolution disabled"); + None + }; + + let logs_service = match Service::new(kafka_sink, token_dropper_arc, team_resolver).await { Ok(service) => service, Err(e) => { error!("Failed to initialize log service: {}", e); diff --git a/rust/capture-logs/src/service.rs b/rust/capture-logs/src/service.rs index 26ae9589da13..fd6722e54898 100644 --- a/rust/capture-logs/src/service.rs +++ b/rust/capture-logs/src/service.rs @@ -16,6 +16,7 @@ use std::io::Write; use std::sync::Arc; use crate::kafka::KafkaSink; +use crate::team_resolver::TeamResolver; use tracing::{debug, error, instrument}; @@ -95,6 +96,7 @@ pub fn parse_otel_message(json_bytes: &Bytes) -> Result, + pub(crate) team_resolver: Option>, } #[derive(Deserialize)] @@ -106,10 +108,12 @@ impl Service { pub async fn new( kafka_sink: KafkaSink, token_dropper: Arc, + team_resolver: Option>, ) -> Result { Ok(Self { sink: kafka_sink, token_dropper, + team_resolver, }) } } @@ -234,8 +238,17 @@ pub async fn export_logs_http( } } + let team_id = match &service.team_resolver { + Some(resolver) => resolver.resolve(token).await, + None => None, + }; + let row_count = rows.len(); - if let Err(e) = service.sink.write(token, rows, body.len() as u64).await { + if let Err(e) = service + .sink + .write(token, rows, body.len() as u64, team_id) + .await + { error!("Failed to send logs to Kafka: {}", e); return Err(( StatusCode::INTERNAL_SERVER_ERROR, diff --git a/rust/capture-logs/src/team_resolver.rs b/rust/capture-logs/src/team_resolver.rs new file mode 100644 index 000000000000..8b6e350e9f88 --- /dev/null +++ b/rust/capture-logs/src/team_resolver.rs @@ -0,0 +1,54 @@ +use moka::future::Cache; +use sqlx::PgPool; +use std::sync::Arc; +use std::time::Duration; +use tracing::{debug, warn}; + +pub struct TeamResolver { + pool: PgPool, + cache: Cache>, +} + +impl TeamResolver { + pub fn new(pool: PgPool, cache_ttl_secs: u64) -> Self { + let cache = Cache::builder() + .max_capacity(10_000) + .time_to_live(Duration::from_secs(cache_ttl_secs)) + .build(); + Self { pool, cache } + } + + pub async fn resolve(&self, token: &str) -> Option { + let token_owned = token.to_string(); + let pool = self.pool.clone(); + match self + .cache + .try_get_with(token_owned.clone(), async { + Self::lookup(&pool, &token_owned).await + }) + .await + { + Ok(team_id) => team_id, + Err(e) => { + warn!("team_id lookup failed: {e}"); + None + } + } + } + + async fn lookup(pool: &PgPool, token: &str) -> Result, Arc> { + let row: Option<(i32,)> = + sqlx::query_as("SELECT id FROM posthog_team WHERE api_token = $1") + .bind(token) + .fetch_optional(pool) + .await + .map_err(Arc::new)?; + let prefix_len = 8.min(token.len()); + debug!( + token_prefix = &token[..prefix_len], + team_id = ?row, + "resolved token" + ); + Ok(row.map(|(id,)| id)) + } +}