diff --git a/crates/terraphim_agent_supervisor/src/supervisor.rs b/crates/terraphim_agent_supervisor/src/supervisor.rs index cff128b0b..f3c9e5207 100644 --- a/crates/terraphim_agent_supervisor/src/supervisor.rs +++ b/crates/terraphim_agent_supervisor/src/supervisor.rs @@ -70,6 +70,10 @@ pub struct AgentSupervisor { agent_factory: Arc, restart_history: Arc>>, shutdown_signal: Arc>>>, + /// True when the supervisor has exceeded its restart intensity limit and + /// entered an escalated state. Once set, no further restart actions are + /// permitted (TLA+ `NoRestartAfterEscalation` invariant). + escalated: bool, } impl AgentSupervisor { @@ -83,6 +87,7 @@ impl AgentSupervisor { agent_factory, restart_history: Arc::new(Mutex::new(Vec::new())), shutdown_signal: Arc::new(Mutex::new(None)), + escalated: false, } } @@ -216,6 +221,17 @@ impl AgentSupervisor { // Check if restart is allowed if !self.should_restart(&agent_id, &reason, Utc::now()).await? { + if self.escalated { + log::error!( + "Supervisor {} escalated — not restarting agent {}", + self.config.supervisor_id.0, + agent_id + ); + self.stop_agent(&agent_id).await?; + return Err(SupervisionError::MaxRestartsExceeded( + self.config.supervisor_id.clone(), + )); + } log::info!("Not restarting agent {} due to policy", agent_id); // Remove the failed agent self.stop_agent(&agent_id).await?; @@ -241,7 +257,7 @@ impl AgentSupervisor { /// Check if agent should be restarted based on policy. /// The `now` parameter enables deterministic testing of restart window boundaries. async fn should_restart( - &self, + &mut self, agent_id: &AgentPid, reason: &ExitReason, now: DateTime, @@ -277,14 +293,14 @@ impl AgentSupervisor { if !is_allowed { log::warn!( - "Agent {} exceeded restart limits (count: {}, time_window: {:?})", + "Agent {} exceeded restart limits (count: {}, time_window: {:?}). Escalating supervisor {}", agent_id, agent_info.restart_count, - time_since_first_restart + time_since_first_restart, + self.config.supervisor_id.0 ); - return Err(SupervisionError::MaxRestartsExceeded( - self.config.supervisor_id.clone(), - )); + self.escalated = true; + return Ok(false); } Ok(true) @@ -292,6 +308,11 @@ impl AgentSupervisor { /// Restart a single agent async fn restart_agent(&mut self, agent_id: &AgentPid) -> SupervisionResult<()> { + if self.escalated { + return Err(SupervisionError::MaxRestartsExceeded( + self.config.supervisor_id.clone(), + )); + } log::info!("Restarting agent {}", agent_id); // Get agent spec and current restart count @@ -399,6 +420,11 @@ impl AgentSupervisor { /// Restart all agents async fn restart_all_agents(&mut self) -> SupervisionResult<()> { + if self.escalated { + return Err(SupervisionError::MaxRestartsExceeded( + self.config.supervisor_id.clone(), + )); + } log::info!("Restarting all agents"); let agent_specs: Vec = { @@ -431,6 +457,11 @@ impl AgentSupervisor { /// Restart agents from a specific point async fn restart_from_agent(&mut self, failed_agent_id: &AgentPid) -> SupervisionResult<()> { + if self.escalated { + return Err(SupervisionError::MaxRestartsExceeded( + self.config.supervisor_id.clone(), + )); + } log::info!("Restarting from agent {}", failed_agent_id); // Get all agent specs in order @@ -544,6 +575,12 @@ impl AgentSupervisor { pub async fn get_restart_history(&self) -> Vec { self.restart_history.lock().await.clone() } + + /// Check whether the supervisor has entered an escalated state. + /// Once escalated, no further restart actions are permitted. + pub fn is_escalated(&self) -> bool { + self.escalated + } } #[cfg(test)] @@ -626,4 +663,118 @@ mod tests { supervisor.stop().await.unwrap(); } + + #[tokio::test] + async fn test_escalation_after_max_restarts() { + let mut config = SupervisorConfig::default(); + config.restart_policy.strategy = RestartStrategy::OneForOne; + config.restart_policy.intensity.max_restarts = 1; + config.restart_policy.intensity.time_window = Duration::from_secs(3600); + + let factory = Arc::new(TestAgentFactory); + let mut supervisor = AgentSupervisor::new(config, factory); + + supervisor.start().await.unwrap(); + + let spec = AgentSpec::new("test".to_string(), json!({})); + let agent_id = supervisor.spawn_agent(spec).await.unwrap(); + + // First failure — restart_count is 0, so restart is allowed + supervisor + .handle_agent_exit( + agent_id.clone(), + ExitReason::Error("first error".to_string()), + ) + .await + .unwrap(); + assert!( + !supervisor.is_escalated(), + "supervisor should not be escalated after first failure" + ); + + // Second failure — restart_count is now 1, max_restarts is 1, so escalate + let result = supervisor + .handle_agent_exit( + agent_id.clone(), + ExitReason::Error("second error".to_string()), + ) + .await; + assert!(result.is_err(), "escalation should produce an error"); + assert!( + supervisor.is_escalated(), + "supervisor should be escalated after max restarts exceeded" + ); + + supervisor.stop().await.unwrap(); + } + + #[tokio::test] + async fn test_no_restart_after_escalation() { + let mut config = SupervisorConfig::default(); + config.restart_policy.strategy = RestartStrategy::OneForOne; + config.restart_policy.intensity.max_restarts = 1; + config.restart_policy.intensity.time_window = Duration::from_secs(3600); + + let factory = Arc::new(TestAgentFactory); + let mut supervisor = AgentSupervisor::new(config, factory); + + supervisor.start().await.unwrap(); + + let spec = AgentSpec::new("test".to_string(), json!({})); + let agent_id = supervisor.spawn_agent(spec).await.unwrap(); + + // Trigger escalation + supervisor + .handle_agent_exit(agent_id.clone(), ExitReason::Error("first".to_string())) + .await + .unwrap(); + supervisor + .handle_agent_exit(agent_id.clone(), ExitReason::Error("second".to_string())) + .await + .expect_err("should escalate"); + assert!(supervisor.is_escalated()); + + // Attempting to restart the same agent after escalation must fail + let restart_result = supervisor.restart_agent(&agent_id).await; + assert!( + restart_result.is_err(), + "restart_agent must fail after escalation (NoRestartAfterEscalation invariant)" + ); + + // Attempting to restart all agents after escalation must fail + let restart_all_result = supervisor.restart_all_agents().await; + assert!( + restart_all_result.is_err(), + "restart_all_agents must fail after escalation" + ); + + supervisor.stop().await.unwrap(); + } + + #[tokio::test] + async fn test_restart_from_agent_blocked_after_escalation() { + let mut config = SupervisorConfig::default(); + config.restart_policy.strategy = RestartStrategy::RestForOne; + + let factory = Arc::new(TestAgentFactory); + let mut supervisor = AgentSupervisor::new(config, factory); + + supervisor.start().await.unwrap(); + + let spec = AgentSpec::new("test".to_string(), json!({})); + let agent_id = supervisor.spawn_agent(spec).await.unwrap(); + + // Manually enter escalated state to verify the guard independently + supervisor.escalated = true; + assert!(supervisor.is_escalated()); + + // restart_from_agent must be blocked when escalated + let result = supervisor.restart_from_agent(&agent_id).await; + assert!( + result.is_err(), + "restart_from_agent must fail after escalation" + ); + + supervisor.stop().await.unwrap(); + } }