Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 51 additions & 17 deletions control/src/admin/local_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::proxy::rate_limiter::RateLimiter;
use crate::proxy::router::Router;

/// Local query implementation that reads from shared gateway state
#[allow(dead_code)]
pub struct LocalGatewayQuery {
router: Arc<Router>,
circuit_breaker: Arc<CircuitBreakerManager>,
Expand Down Expand Up @@ -42,47 +41,78 @@ impl GatewayQuery for LocalGatewayQuery {
async fn snapshot(&self) -> anyhow::Result<GatewaySnapshot> {
let route_count = self.router.route_count();
let uptime = self.start_time.elapsed().as_secs();
let open_circuits = self.circuit_breaker.open_count();
let exhausted_rate_limiters = self.rate_limiter.exhausted_count();

Ok(GatewaySnapshot {
status: "ok".to_string(),
uptime_seconds: uptime,
route_count,
open_circuits: 0, // Would need snapshot from CircuitBreakerManager
exhausted_rate_limiters: 0,
open_circuits,
exhausted_rate_limiters,
listeners: vec![],
cache_stats: Some(self.cache_stats_internal()),
})
}

async fn list_routes(
&self,
_method_filter: Option<&str>,
_path_prefix: Option<&str>,
method_filter: Option<&str>,
path_prefix: Option<&str>,
) -> anyhow::Result<Vec<RouteSnapshot>> {
// The router doesn't expose a full route list yet — placeholder
Ok(vec![])
let mut routes = self.router.list_routes();

// Apply filters
if let Some(method) = method_filter {
let method_upper = method.to_uppercase();
routes.retain(|r| r.method == method_upper);
}
if let Some(prefix) = path_prefix {
routes.retain(|r| r.pattern.starts_with(prefix));
}

// Sort by pattern for stable output
routes.sort_by(|a, b| a.pattern.cmp(&b.pattern));
Ok(routes)
}

async fn get_route(&self, _pattern: &str) -> anyhow::Result<Option<RouteSnapshot>> {
Ok(None)
async fn get_route(&self, pattern: &str) -> anyhow::Result<Option<RouteSnapshot>> {
let routes = self.router.list_routes();
Ok(routes.into_iter().find(|r| r.pattern == pattern))
}

async fn list_circuit_breakers(
&self,
_state_filter: Option<&str>,
state_filter: Option<&str>,
) -> anyhow::Result<Vec<CircuitBreakerSnapshot>> {
// CircuitBreakerManager doesn't expose iteration yet — placeholder
Ok(vec![])
let mut breakers = self.circuit_breaker.snapshot_all();

if let Some(state) = state_filter {
let state_upper = state.to_uppercase();
// Match "OPEN", "CLOSED", "HALFOPEN"
breakers.retain(|b| b.state.to_uppercase() == state_upper);
}

breakers.sort_by(|a, b| a.backend_id.cmp(&b.backend_id));
Ok(breakers)
}

async fn list_rate_limiters(
&self,
_route_filter: Option<&str>,
route_filter: Option<&str>,
) -> anyhow::Result<Vec<RateLimiterSnapshot>> {
Ok(vec![])
let mut limiters = self.rate_limiter.snapshot_all();

if let Some(route) = route_filter {
limiters.retain(|l| l.route.contains(route));
}

limiters.sort_by(|a, b| a.route.cmp(&b.route));
Ok(limiters)
}

async fn list_listeners(&self) -> anyhow::Result<Vec<ListenerSnapshot>> {
// ListenerManager is not wired into LocalGatewayQuery yet
Ok(vec![])
}

Expand All @@ -106,11 +136,15 @@ impl GatewayQuery for LocalGatewayQuery {
use agent_api::diagnostics::engine::{DiagnosticContext, DiagnosticsEngine};

let snapshot = self.snapshot().await?;
let routes = self.router.list_routes();
let circuit_breakers = self.circuit_breaker.snapshot_all();
let rate_limiters = self.rate_limiter.snapshot_all();

let ctx = DiagnosticContext {
snapshot,
routes: vec![],
circuit_breakers: vec![],
rate_limiters: vec![],
routes,
circuit_breakers,
rate_limiters,
};

let engine = DiagnosticsEngine::with_builtin_rules();
Expand Down
42 changes: 33 additions & 9 deletions control/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,25 @@ pub enum RautaError {
Io(#[from] std::io::Error),
}

/// Proxy request errors with structured discrimination (replaces string-based error matching)
#[allow(dead_code)]
/// Proxy request errors with structured discrimination
///
/// Replaces string-based error matching (`e.starts_with("TIMEOUT:")`) with
/// proper enum variants. Each variant maps to a specific HTTP status code.
#[derive(Debug)]
#[allow(dead_code)]
pub enum ProxyError {
/// Backend request or overall request timeout exceeded
/// Backend request or overall request timeout exceeded → 504
Timeout { message: String },
/// Backend connection or protocol error
/// Backend connection or protocol error → 502
BackendError { message: String },
/// Request body too large
/// Request body too large → 413
BodyTooLarge { size: usize, max: usize },
/// Filter application failed
/// Filter application failed → 500
FilterError { message: String },
}

#[allow(dead_code)]
impl ProxyError {
/// HTTP status code for this error
pub fn status_code(&self) -> u16 {
match self {
ProxyError::Timeout { .. } => 504,
Expand All @@ -43,17 +45,39 @@ impl ProxyError {
pub fn is_timeout(&self) -> bool {
matches!(self, ProxyError::Timeout { .. })
}

pub fn status_str(&self) -> &'static str {
match self {
ProxyError::Timeout { .. } => "504",
ProxyError::BackendError { .. } => "502",
ProxyError::BodyTooLarge { .. } => "413",
ProxyError::FilterError { .. } => "500",
}
}
}

impl std::fmt::Display for ProxyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ProxyError::Timeout { message } => write!(f, "TIMEOUT: {}", message),
ProxyError::Timeout { message } => write!(f, "{}", message),
ProxyError::BackendError { message } => write!(f, "{}", message),
ProxyError::BodyTooLarge { size, max } => {
write!(f, "Request body too large: {} bytes (max {})", size, max)
}
ProxyError::FilterError { message } => write!(f, "Filter error: {}", message),
ProxyError::FilterError { message } => write!(f, "{}", message),
}
}
}

impl std::error::Error for ProxyError {}

/// Allow `?` on String errors (legacy compatibility during migration).
///
/// Maps all string errors to `BackendError` (502). This is a transitional shim —
/// callers should construct specific ProxyError variants directly. Will be removed
/// once all error sites are migrated.
impl From<String> for ProxyError {
fn from(s: String) -> Self {
ProxyError::BackendError { message: s }
}
}
24 changes: 24 additions & 0 deletions control/src/proxy/circuit_breaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,30 @@ impl CircuitBreakerManager {
snapshot.get(backend_id).map(|b| b.state())
}

/// Snapshot all circuit breakers for the admin API/CLI/MCP
///
/// Lock-free read via ArcSwap::load(). Returns a snapshot of all tracked backends.
pub fn snapshot_all(&self) -> Vec<agent_api::types::CircuitBreakerSnapshot> {
let snapshot = self.breakers.load();
snapshot
.iter()
.map(|(id, breaker)| agent_api::types::CircuitBreakerSnapshot {
backend_id: id.clone(),
state: state_to_str(breaker.state()).to_string(),
failure_count: breaker.failure_count(),
})
.collect()
}

/// Count circuit breakers in Open state
pub fn open_count(&self) -> usize {
let snapshot = self.breakers.load();
snapshot
.values()
.filter(|b| b.state() == CircuitState::Open)
.count()
}

/// Remove circuit breaker for backend
#[allow(dead_code)]
pub fn remove_backend(&self, backend_id: &str) {
Expand Down
43 changes: 27 additions & 16 deletions control/src/proxy/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! Includes protocol detection (HTTP/1.1 vs HTTP/2), connection pooling,
//! timeout enforcement, and hop-by-hop header filtering.

use crate::error::ProxyError;
use crate::proxy::backend_pool::{BackendConnectionPools, PoolError};
use crate::proxy::filters::Timeout;
use crate::proxy::worker::Worker;
Expand Down Expand Up @@ -76,7 +77,7 @@ pub async fn forward_to_backend(
workers: Option<Workers>,
worker_index: Option<usize>,
timeout_config: Option<&Timeout>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, String> {
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ProxyError> {
let request_start = Instant::now();

// Read the incoming request body
Expand All @@ -95,29 +96,37 @@ pub async fn forward_to_backend(
Bytes::new() // Empty body, zero allocation
} else {
// Slow path: POST/PUT/PATCH may have body
// Enforce max body size to prevent OOM from unbounded requests (10MB default)
// Stream body with size limit to prevent OOM (10MB default).
// Enforced during reading — we stop accepting data once the cap is reached.
const MAX_BODY_SIZE: usize = 10 * 1024 * 1024;
let body_read_start = Instant::now();
let collected = body.collect().await.map_err(|e| {
let limited = http_body_util::Limited::new(body, MAX_BODY_SIZE);
let collected = limited.collect().await.map_err(|e| {
// Check if this is a LengthLimitError from http_body_util::Limited
if e.is::<http_body_util::LengthLimitError>() {
error!(
request_id = %request_id,
max_size = MAX_BODY_SIZE,
"Request body exceeded size limit"
);
return ProxyError::BodyTooLarge {
size: 0, // Actual size unknown — reading was aborted at limit
max: MAX_BODY_SIZE,
};
}
error!(
request_id = %request_id,
error.message = %e,
error.type = "request_body_read",
elapsed_us = body_read_start.elapsed().as_micros() as u64,
"Failed to read request body"
);
format!("Failed to read request body: {}", e)
ProxyError::BackendError {
message: format!("Failed to read request body: {}", e),
}
})?;
let bytes = collected.to_bytes();

if bytes.len() > MAX_BODY_SIZE {
return Err(format!(
"Request body too large: {} bytes (max {})",
bytes.len(),
MAX_BODY_SIZE
));
}

let body_read_duration = body_read_start.elapsed();
info!(
request_id = %request_id,
Expand Down Expand Up @@ -387,10 +396,12 @@ pub async fn forward_to_backend(
network.peer.address = %backend,
"Backend request timeout exceeded"
);
return Err(format!(
"TIMEOUT: Backend request exceeded {}ms timeout",
timeout_duration.as_millis()
));
return Err(ProxyError::Timeout {
message: format!(
"Backend request exceeded {}ms timeout",
timeout_duration.as_millis()
),
});
}
}
} else {
Expand Down
33 changes: 33 additions & 0 deletions control/src/proxy/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,16 @@ impl TokenBucket {
fixed_to_float(tokens).min(self.capacity_f64)
}

/// Get burst capacity
pub fn capacity(&self) -> f64 {
self.capacity_f64
}

/// Get refill rate in tokens per second
pub fn refill_rate(&self) -> f64 {
fixed_to_float(self.refill_rate_per_ms_fixed) * 1000.0
}

/// Reset bucket to full capacity (for testing)
#[cfg(test)]
#[allow(dead_code)]
Expand Down Expand Up @@ -363,6 +373,29 @@ impl RateLimiter {
let snapshot = self.buckets.load();
snapshot.get(route).map(|bucket| bucket.available_tokens())
}

/// Snapshot all rate limiter buckets for the admin API/CLI/MCP
pub fn snapshot_all(&self) -> Vec<agent_api::types::RateLimiterSnapshot> {
let snapshot = self.buckets.load();
snapshot
.iter()
.map(|(route, bucket)| agent_api::types::RateLimiterSnapshot {
route: route.clone(),
tokens_available: bucket.available_tokens(),
capacity: bucket.capacity(),
refill_rate: bucket.refill_rate(),
})
.collect()
}

/// Count buckets with zero tokens (actively rate limiting)
pub fn exhausted_count(&self) -> usize {
let snapshot = self.buckets.load();
snapshot
.values()
.filter(|b| b.available_tokens() <= 0.0)
.count()
}
}

impl Default for RateLimiter {
Expand Down
Loading
Loading