diff --git a/control/src/admin/local_query.rs b/control/src/admin/local_query.rs index 94b1173..aa21d8b 100644 --- a/control/src/admin/local_query.rs +++ b/control/src/admin/local_query.rs @@ -14,7 +14,6 @@ use crate::proxy::rate_limiter::RateLimiter; use crate::proxy::router::Router; /// Local query implementation that reads from shared gateway state -#[allow(dead_code)] pub struct LocalGatewayQuery { router: Arc, circuit_breaker: Arc, @@ -42,13 +41,15 @@ impl GatewayQuery for LocalGatewayQuery { async fn snapshot(&self) -> anyhow::Result { let route_count = self.router.route_count(); let uptime = self.start_time.elapsed().as_secs(); + let open_circuits = self.circuit_breaker.open_count(); + let exhausted_rate_limiters = self.rate_limiter.exhausted_count(); Ok(GatewaySnapshot { status: "ok".to_string(), uptime_seconds: uptime, route_count, - open_circuits: 0, // Would need snapshot from CircuitBreakerManager - exhausted_rate_limiters: 0, + open_circuits, + exhausted_rate_limiters, listeners: vec![], cache_stats: Some(self.cache_stats_internal()), }) @@ -56,33 +57,62 @@ impl GatewayQuery for LocalGatewayQuery { async fn list_routes( &self, - _method_filter: Option<&str>, - _path_prefix: Option<&str>, + method_filter: Option<&str>, + path_prefix: Option<&str>, ) -> anyhow::Result> { - // The router doesn't expose a full route list yet — placeholder - Ok(vec![]) + let mut routes = self.router.list_routes(); + + // Apply filters + if let Some(method) = method_filter { + let method_upper = method.to_uppercase(); + routes.retain(|r| r.method == method_upper); + } + if let Some(prefix) = path_prefix { + routes.retain(|r| r.pattern.starts_with(prefix)); + } + + // Sort by pattern for stable output + routes.sort_by(|a, b| a.pattern.cmp(&b.pattern)); + Ok(routes) } - async fn get_route(&self, _pattern: &str) -> anyhow::Result> { - Ok(None) + async fn get_route(&self, pattern: &str) -> anyhow::Result> { + let routes = self.router.list_routes(); + Ok(routes.into_iter().find(|r| r.pattern == pattern)) } async fn list_circuit_breakers( &self, - _state_filter: Option<&str>, + state_filter: Option<&str>, ) -> anyhow::Result> { - // CircuitBreakerManager doesn't expose iteration yet — placeholder - Ok(vec![]) + let mut breakers = self.circuit_breaker.snapshot_all(); + + if let Some(state) = state_filter { + let state_upper = state.to_uppercase(); + // Match "OPEN", "CLOSED", "HALFOPEN" + breakers.retain(|b| b.state.to_uppercase() == state_upper); + } + + breakers.sort_by(|a, b| a.backend_id.cmp(&b.backend_id)); + Ok(breakers) } async fn list_rate_limiters( &self, - _route_filter: Option<&str>, + route_filter: Option<&str>, ) -> anyhow::Result> { - Ok(vec![]) + let mut limiters = self.rate_limiter.snapshot_all(); + + if let Some(route) = route_filter { + limiters.retain(|l| l.route.contains(route)); + } + + limiters.sort_by(|a, b| a.route.cmp(&b.route)); + Ok(limiters) } async fn list_listeners(&self) -> anyhow::Result> { + // ListenerManager is not wired into LocalGatewayQuery yet Ok(vec![]) } @@ -106,11 +136,15 @@ impl GatewayQuery for LocalGatewayQuery { use agent_api::diagnostics::engine::{DiagnosticContext, DiagnosticsEngine}; let snapshot = self.snapshot().await?; + let routes = self.router.list_routes(); + let circuit_breakers = self.circuit_breaker.snapshot_all(); + let rate_limiters = self.rate_limiter.snapshot_all(); + let ctx = DiagnosticContext { snapshot, - routes: vec![], - circuit_breakers: vec![], - rate_limiters: vec![], + routes, + circuit_breakers, + rate_limiters, }; let engine = DiagnosticsEngine::with_builtin_rules(); diff --git a/control/src/error.rs b/control/src/error.rs index 79433ba..ca1ab8b 100644 --- a/control/src/error.rs +++ b/control/src/error.rs @@ -14,23 +14,25 @@ pub enum RautaError { Io(#[from] std::io::Error), } -/// Proxy request errors with structured discrimination (replaces string-based error matching) -#[allow(dead_code)] +/// Proxy request errors with structured discrimination +/// +/// Replaces string-based error matching (`e.starts_with("TIMEOUT:")`) with +/// proper enum variants. Each variant maps to a specific HTTP status code. #[derive(Debug)] +#[allow(dead_code)] pub enum ProxyError { - /// Backend request or overall request timeout exceeded + /// Backend request or overall request timeout exceeded → 504 Timeout { message: String }, - /// Backend connection or protocol error + /// Backend connection or protocol error → 502 BackendError { message: String }, - /// Request body too large + /// Request body too large → 413 BodyTooLarge { size: usize, max: usize }, - /// Filter application failed + /// Filter application failed → 500 FilterError { message: String }, } #[allow(dead_code)] impl ProxyError { - /// HTTP status code for this error pub fn status_code(&self) -> u16 { match self { ProxyError::Timeout { .. } => 504, @@ -43,17 +45,39 @@ impl ProxyError { pub fn is_timeout(&self) -> bool { matches!(self, ProxyError::Timeout { .. }) } + + pub fn status_str(&self) -> &'static str { + match self { + ProxyError::Timeout { .. } => "504", + ProxyError::BackendError { .. } => "502", + ProxyError::BodyTooLarge { .. } => "413", + ProxyError::FilterError { .. } => "500", + } + } } impl std::fmt::Display for ProxyError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - ProxyError::Timeout { message } => write!(f, "TIMEOUT: {}", message), + ProxyError::Timeout { message } => write!(f, "{}", message), ProxyError::BackendError { message } => write!(f, "{}", message), ProxyError::BodyTooLarge { size, max } => { write!(f, "Request body too large: {} bytes (max {})", size, max) } - ProxyError::FilterError { message } => write!(f, "Filter error: {}", message), + ProxyError::FilterError { message } => write!(f, "{}", message), } } } + +impl std::error::Error for ProxyError {} + +/// Allow `?` on String errors (legacy compatibility during migration). +/// +/// Maps all string errors to `BackendError` (502). This is a transitional shim — +/// callers should construct specific ProxyError variants directly. Will be removed +/// once all error sites are migrated. +impl From for ProxyError { + fn from(s: String) -> Self { + ProxyError::BackendError { message: s } + } +} diff --git a/control/src/proxy/circuit_breaker.rs b/control/src/proxy/circuit_breaker.rs index e5a0f68..bdfe0e1 100644 --- a/control/src/proxy/circuit_breaker.rs +++ b/control/src/proxy/circuit_breaker.rs @@ -687,6 +687,30 @@ impl CircuitBreakerManager { snapshot.get(backend_id).map(|b| b.state()) } + /// Snapshot all circuit breakers for the admin API/CLI/MCP + /// + /// Lock-free read via ArcSwap::load(). Returns a snapshot of all tracked backends. + pub fn snapshot_all(&self) -> Vec { + let snapshot = self.breakers.load(); + snapshot + .iter() + .map(|(id, breaker)| agent_api::types::CircuitBreakerSnapshot { + backend_id: id.clone(), + state: state_to_str(breaker.state()).to_string(), + failure_count: breaker.failure_count(), + }) + .collect() + } + + /// Count circuit breakers in Open state + pub fn open_count(&self) -> usize { + let snapshot = self.breakers.load(); + snapshot + .values() + .filter(|b| b.state() == CircuitState::Open) + .count() + } + /// Remove circuit breaker for backend #[allow(dead_code)] pub fn remove_backend(&self, backend_id: &str) { diff --git a/control/src/proxy/forwarder.rs b/control/src/proxy/forwarder.rs index 6ab13c2..e915fc7 100644 --- a/control/src/proxy/forwarder.rs +++ b/control/src/proxy/forwarder.rs @@ -4,6 +4,7 @@ //! Includes protocol detection (HTTP/1.1 vs HTTP/2), connection pooling, //! timeout enforcement, and hop-by-hop header filtering. +use crate::error::ProxyError; use crate::proxy::backend_pool::{BackendConnectionPools, PoolError}; use crate::proxy::filters::Timeout; use crate::proxy::worker::Worker; @@ -76,7 +77,7 @@ pub async fn forward_to_backend( workers: Option, worker_index: Option, timeout_config: Option<&Timeout>, -) -> Result>, String> { +) -> Result>, ProxyError> { let request_start = Instant::now(); // Read the incoming request body @@ -95,10 +96,24 @@ pub async fn forward_to_backend( Bytes::new() // Empty body, zero allocation } else { // Slow path: POST/PUT/PATCH may have body - // Enforce max body size to prevent OOM from unbounded requests (10MB default) + // Stream body with size limit to prevent OOM (10MB default). + // Enforced during reading — we stop accepting data once the cap is reached. const MAX_BODY_SIZE: usize = 10 * 1024 * 1024; let body_read_start = Instant::now(); - let collected = body.collect().await.map_err(|e| { + let limited = http_body_util::Limited::new(body, MAX_BODY_SIZE); + let collected = limited.collect().await.map_err(|e| { + // Check if this is a LengthLimitError from http_body_util::Limited + if e.is::() { + error!( + request_id = %request_id, + max_size = MAX_BODY_SIZE, + "Request body exceeded size limit" + ); + return ProxyError::BodyTooLarge { + size: 0, // Actual size unknown — reading was aborted at limit + max: MAX_BODY_SIZE, + }; + } error!( request_id = %request_id, error.message = %e, @@ -106,18 +121,12 @@ pub async fn forward_to_backend( elapsed_us = body_read_start.elapsed().as_micros() as u64, "Failed to read request body" ); - format!("Failed to read request body: {}", e) + ProxyError::BackendError { + message: format!("Failed to read request body: {}", e), + } })?; let bytes = collected.to_bytes(); - if bytes.len() > MAX_BODY_SIZE { - return Err(format!( - "Request body too large: {} bytes (max {})", - bytes.len(), - MAX_BODY_SIZE - )); - } - let body_read_duration = body_read_start.elapsed(); info!( request_id = %request_id, @@ -387,10 +396,12 @@ pub async fn forward_to_backend( network.peer.address = %backend, "Backend request timeout exceeded" ); - return Err(format!( - "TIMEOUT: Backend request exceeded {}ms timeout", - timeout_duration.as_millis() - )); + return Err(ProxyError::Timeout { + message: format!( + "Backend request exceeded {}ms timeout", + timeout_duration.as_millis() + ), + }); } } } else { diff --git a/control/src/proxy/rate_limiter.rs b/control/src/proxy/rate_limiter.rs index a0b501c..da59f32 100644 --- a/control/src/proxy/rate_limiter.rs +++ b/control/src/proxy/rate_limiter.rs @@ -262,6 +262,16 @@ impl TokenBucket { fixed_to_float(tokens).min(self.capacity_f64) } + /// Get burst capacity + pub fn capacity(&self) -> f64 { + self.capacity_f64 + } + + /// Get refill rate in tokens per second + pub fn refill_rate(&self) -> f64 { + fixed_to_float(self.refill_rate_per_ms_fixed) * 1000.0 + } + /// Reset bucket to full capacity (for testing) #[cfg(test)] #[allow(dead_code)] @@ -363,6 +373,29 @@ impl RateLimiter { let snapshot = self.buckets.load(); snapshot.get(route).map(|bucket| bucket.available_tokens()) } + + /// Snapshot all rate limiter buckets for the admin API/CLI/MCP + pub fn snapshot_all(&self) -> Vec { + let snapshot = self.buckets.load(); + snapshot + .iter() + .map(|(route, bucket)| agent_api::types::RateLimiterSnapshot { + route: route.clone(), + tokens_available: bucket.available_tokens(), + capacity: bucket.capacity(), + refill_rate: bucket.refill_rate(), + }) + .collect() + } + + /// Count buckets with zero tokens (actively rate limiting) + pub fn exhausted_count(&self) -> usize { + let snapshot = self.buckets.load(); + snapshot + .values() + .filter(|b| b.available_tokens() <= 0.0) + .count() + } } impl Default for RateLimiter { diff --git a/control/src/proxy/request_handler.rs b/control/src/proxy/request_handler.rs index ef6bd0e..d0f55b6 100644 --- a/control/src/proxy/request_handler.rs +++ b/control/src/proxy/request_handler.rs @@ -4,6 +4,7 @@ //! filter application, and response generation. use crate::apis::metrics::CONTROLLER_METRICS_REGISTRY; +use crate::error::ProxyError; use crate::proxy::backend_pool::gather_pool_metrics; use crate::proxy::circuit_breaker::CircuitBreakerManager; use crate::proxy::filters::{ @@ -130,7 +131,7 @@ pub fn apply_response_filters( pub fn build_redirect_response( req: &Request, redirect: &RequestRedirect, -) -> Result>, String> { +) -> Result>, ProxyError> { let uri = req.uri(); // Extract components with fallbacks @@ -173,7 +174,9 @@ pub fn build_redirect_response( .map_err(|never| match never {}) .boxed(), ) - .map_err(|e| format!("Failed to build redirect response: {}", e)) + .map_err(|e| ProxyError::BackendError { + message: format!("Failed to build redirect response: {}", e), + }) } /// Convert HttpMethod to static string (zero allocations) @@ -224,7 +227,7 @@ pub async fn handle_request( worker_selector: Option>, rate_limiter: Arc, circuit_breaker: Arc, -) -> Result>, String> { +) -> Result>, ProxyError> { // Generate or extract request ID let request_id = req .headers() @@ -234,6 +237,11 @@ pub async fn handle_request( .unwrap_or_else(|| Uuid::new_v4().to_string()); let path = req.uri().path().to_string(); + let path_and_query = req + .uri() + .path_and_query() + .map(|pq| pq.as_str().to_string()) + .unwrap_or_else(|| path.clone()); let method_name = req.method().as_str(); info!( @@ -341,7 +349,7 @@ pub async fn handle_request( req, &route_match, method, - &path, + &path_and_query, &request_id, client, backend_pools, @@ -380,7 +388,7 @@ pub async fn handle_request( } /// Serve the /metrics endpoint -async fn serve_metrics_endpoint() -> Result>, String> { +async fn serve_metrics_endpoint() -> Result>, ProxyError> { // Force initialization of lazy_static metrics let _ = &*HTTP_REQUEST_DURATION; let _ = &*HTTP_REQUESTS_TOTAL; @@ -441,7 +449,7 @@ async fn serve_metrics_endpoint() -> Result Result>, String> { +fn serve_healthz_endpoint() -> Result>, ProxyError> { #[allow(clippy::unwrap_used)] Ok(Response::builder() .status(StatusCode::OK) @@ -462,7 +470,7 @@ fn serve_healthz_endpoint() -> Result>, St /// - routes: number of configured routes fn serve_status_endpoint( router: &Router, -) -> Result>, String> { +) -> Result>, ProxyError> { let uptime = START_TIME.elapsed().as_secs(); let routes = router.route_count(); @@ -497,7 +505,7 @@ async fn execute_request_with_retry( protocol_cache: ProtocolCache, workers: Option, worker_index: Option, -) -> Result>, String> { +) -> Result>, ProxyError> { let overall_request_timeout = route_match.timeout.as_deref().and_then(|t| t.request); let retry_config = route_match.retry.as_deref(); let is_retryable_method = @@ -550,7 +558,7 @@ async fn execute_with_retry( overall_timeout: Option, retry_cfg: &RetryConfig, method: common::HttpMethod, -) -> Result>, String> { +) -> Result>, ProxyError> { let max_attempts = retry_cfg.max_retries + 1; // Save original headers for retry requests (fix: retry was dropping all headers except Host) @@ -575,10 +583,12 @@ async fn execute_with_retry( .await { Ok(result) => result, - Err(_elapsed) => Err(format!( - "TIMEOUT: Request exceeded {}ms overall timeout", - timeout_duration.as_millis() - )), + Err(_elapsed) => Err(ProxyError::Timeout { + message: format!( + "Request exceeded {}ms overall timeout", + timeout_duration.as_millis() + ), + }), } } else { forward_to_backend( @@ -619,12 +629,8 @@ async fn execute_with_retry( ); tokio::time::sleep(backoff_delay).await; - // Rebuild request for retry - let backend_uri = format!( - "http://{}/{}", - route_match.backend, - path.trim_start_matches('/') - ); + // Rebuild request for retry (preserves query string from original request) + let backend_uri = format!("http://{}{}", route_match.backend, path); let retry_method = match method { common::HttpMethod::GET => hyper::Method::GET, @@ -653,7 +659,9 @@ async fn execute_with_retry( let retry_req = match retry_builder.body(Full::new(Bytes::new())) { Ok(r) => r, Err(e) => { - last_result = Err(format!("Failed to build retry request: {}", e)); + last_result = Err(ProxyError::BackendError { + message: format!("Failed to build retry request: {}", e), + }); break; } }; @@ -663,7 +671,9 @@ async fn execute_with_retry( let (parts, body) = resp.into_parts(); Ok(Response::from_parts(parts, body.map_err(|e| e).boxed())) } - Err(e) => Err(format!("Retry request failed: {}", e)), + Err(e) => Err(ProxyError::BackendError { + message: format!("Retry request failed: {}", e), + }), }; let should_continue = match &retry_result { @@ -694,7 +704,7 @@ async fn execute_without_retry( workers: Option, worker_index: Option, overall_timeout: Option, -) -> Result>, String> { +) -> Result>, ProxyError> { if let Some(timeout_duration) = overall_timeout { match tokio::time::timeout( timeout_duration, @@ -719,10 +729,12 @@ async fn execute_without_retry( timeout_ms = timeout_duration.as_millis(), "Overall request timeout exceeded" ); - Err(format!( - "TIMEOUT: Request exceeded {}ms overall timeout", - timeout_duration.as_millis() - )) + Err(ProxyError::Timeout { + message: format!( + "Request exceeded {}ms overall timeout", + timeout_duration.as_millis() + ), + }) } } } else { @@ -744,7 +756,7 @@ async fn execute_without_retry( /// Record request metrics #[allow(clippy::too_many_arguments)] fn record_request_metrics( - result: &Result>, String>, + result: &Result>, ProxyError>, method: common::HttpMethod, route_pattern: &str, worker_index: Option, @@ -780,12 +792,9 @@ fn record_request_metrics( circuit_breaker.record_success(backend_id); } } - Err(e) => { - let (status_code, status_str) = if e.starts_with("TIMEOUT:") { - (504_u16, "504") - } else { - (500_u16, "500") - }; + Err(ref e) => { + let status_code = e.status_code(); + let status_str = e.status_str(); HTTP_REQUESTS_TOTAL .with_label_values(&[method_str, route_pattern, status_str, &worker_label]) @@ -794,8 +803,12 @@ fn record_request_metrics( .with_label_values(&[method_str, route_pattern, status_str]) .observe(duration.as_secs_f64()); - router.record_backend_response(backend, status_code); - circuit_breaker.record_failure(backend_id); + // Only count backend-originated errors against health/circuit breaker. + // Client-side errors (BodyTooLarge) are not the backend's fault. + if !matches!(e, ProxyError::BodyTooLarge { .. }) { + router.record_backend_response(backend, status_code); + circuit_breaker.record_failure(backend_id); + } error!( request_id = %request_id, @@ -837,10 +850,10 @@ fn record_not_found_metrics( /// Finalize response with optional response filters fn finalize_response( - result: Result>, String>, + result: Result>, ProxyError>, response_filters: Option<&ResponseHeaderModifier>, request_id: &str, -) -> Result>, String> { +) -> Result>, ProxyError> { if let Some(filters) = response_filters { match result { Ok(mut resp) => { @@ -864,16 +877,13 @@ fn finalize_response( } } -/// Convert error string to HTTP response +/// Convert ProxyError to HTTP response fn error_to_response( - error: String, + error: ProxyError, request_id: &str, -) -> Result>, String> { - let status = if error.starts_with("TIMEOUT:") { - StatusCode::GATEWAY_TIMEOUT - } else { - StatusCode::INTERNAL_SERVER_ERROR - }; +) -> Result>, ProxyError> { + let status = + StatusCode::from_u16(error.status_code()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); #[allow(clippy::unwrap_used)] Ok(Response::builder() @@ -881,7 +891,7 @@ fn error_to_response( .header("Content-Type", "text/plain") .header("X-Request-ID", request_id) .body( - Full::new(Bytes::from(error)) + Full::new(Bytes::from(error.to_string())) .map_err(|never| match never {}) .boxed(), ) @@ -892,7 +902,7 @@ fn error_to_response( fn build_error_response( status: StatusCode, message: String, -) -> Result>, String> { +) -> Result>, ProxyError> { #[allow(clippy::unwrap_used)] Ok(Response::builder() .status(status) @@ -908,7 +918,7 @@ fn build_error_response( /// Build 404 not found response fn build_not_found_response( request_id: &str, -) -> Result>, String> { +) -> Result>, ProxyError> { #[allow(clippy::unwrap_used)] Ok(Response::builder() .status(StatusCode::NOT_FOUND) @@ -922,7 +932,7 @@ fn build_not_found_response( } /// Build rate limit exceeded response -fn build_rate_limit_response() -> Result>, String> { +fn build_rate_limit_response() -> Result>, ProxyError> { #[allow(clippy::unwrap_used)] Ok(Response::builder() .status(StatusCode::TOO_MANY_REQUESTS) @@ -937,7 +947,7 @@ fn build_rate_limit_response() -> Result>, } /// Build circuit breaker open response -fn build_circuit_breaker_response() -> Result>, String> { +fn build_circuit_breaker_response() -> Result>, ProxyError> { #[allow(clippy::unwrap_used)] Ok(Response::builder() .status(StatusCode::SERVICE_UNAVAILABLE) diff --git a/control/src/proxy/router.rs b/control/src/proxy/router.rs index 45fe9d5..e5d955e 100644 --- a/control/src/proxy/router.rs +++ b/control/src/proxy/router.rs @@ -405,6 +405,74 @@ impl Router { routes.len() } + /// List all configured routes as agent-api snapshots + /// + /// Reads the route table under a brief read lock and converts internal + /// Route structs to public RouteSnapshot types for the admin API/CLI/MCP. + pub fn list_routes(&self) -> Vec { + let routes = safe_read(&self.routes); + let health_snapshot = self.health.load(); + + routes + .iter() + .map(|(key, route)| { + let method_str = match key.method { + HttpMethod::GET => "GET", + HttpMethod::POST => "POST", + HttpMethod::PUT => "PUT", + HttpMethod::DELETE => "DELETE", + HttpMethod::HEAD => "HEAD", + HttpMethod::OPTIONS => "OPTIONS", + HttpMethod::PATCH => "PATCH", + HttpMethod::ALL => "ALL", + }; + + let backends = route + .backends + .iter() + .map(|b| { + let address = if b.is_ipv6() { + b.as_ipv6() + .map(|ip| format!("[{}]", ip)) + .unwrap_or_default() + } else { + b.as_ipv4().map(|ip| ip.to_string()).unwrap_or_default() + }; + + let is_draining = health_snapshot.draining_backends.contains_key(b); + let health_score = health_snapshot.backend_health.get(b).map(|h| { + if h.is_healthy() { + 1.0 + } else { + 0.0 + } + }); + + agent_api::types::BackendSnapshot { + address, + port: b.port, + weight: b.weight, + is_ipv6: b.is_ipv6(), + is_draining, + health_score, + } + }) + .collect(); + + agent_api::types::RouteSnapshot { + pattern: route.pattern.to_string(), + method: method_str.to_string(), + backends, + has_request_filters: route.request_filters.is_some(), + has_response_filters: route.response_filters.is_some(), + has_redirect: route.redirect.is_some(), + has_timeout: route.timeout.is_some(), + has_retry: route.retry.is_some(), + } + }) + .collect() + } + /// Add or update route with backends (idempotent) /// /// If the route already exists with the same backends, this is a no-op.