Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 157 additions & 6 deletions crates/terraphim_agent_supervisor/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ pub struct AgentSupervisor {
agent_factory: Arc<dyn AgentFactory>,
restart_history: Arc<Mutex<Vec<RestartEntry>>>,
shutdown_signal: Arc<Mutex<Option<tokio::sync::oneshot::Receiver<()>>>>,
/// True when the supervisor has exceeded its restart intensity limit and
/// entered an escalated state. Once set, no further restart actions are
/// permitted (TLA+ `NoRestartAfterEscalation` invariant).
escalated: bool,
}

impl AgentSupervisor {
Expand All @@ -83,6 +87,7 @@ impl AgentSupervisor {
agent_factory,
restart_history: Arc::new(Mutex::new(Vec::new())),
shutdown_signal: Arc::new(Mutex::new(None)),
escalated: false,
}
}

Expand Down Expand Up @@ -216,6 +221,17 @@ impl AgentSupervisor {

// Check if restart is allowed
if !self.should_restart(&agent_id, &reason, Utc::now()).await? {
if self.escalated {
log::error!(
"Supervisor {} escalated — not restarting agent {}",
self.config.supervisor_id.0,
agent_id
);
self.stop_agent(&agent_id).await?;
return Err(SupervisionError::MaxRestartsExceeded(
self.config.supervisor_id.clone(),
));
}
log::info!("Not restarting agent {} due to policy", agent_id);
// Remove the failed agent
self.stop_agent(&agent_id).await?;
Expand All @@ -241,7 +257,7 @@ impl AgentSupervisor {
/// Check if agent should be restarted based on policy.
/// The `now` parameter enables deterministic testing of restart window boundaries.
async fn should_restart(
&self,
&mut self,
agent_id: &AgentPid,
reason: &ExitReason,
now: DateTime<Utc>,
Expand Down Expand Up @@ -277,21 +293,26 @@ impl AgentSupervisor {

if !is_allowed {
log::warn!(
"Agent {} exceeded restart limits (count: {}, time_window: {:?})",
"Agent {} exceeded restart limits (count: {}, time_window: {:?}). Escalating supervisor {}",
agent_id,
agent_info.restart_count,
time_since_first_restart
time_since_first_restart,
self.config.supervisor_id.0
);
return Err(SupervisionError::MaxRestartsExceeded(
self.config.supervisor_id.clone(),
));
self.escalated = true;
return Ok(false);
}

Ok(true)
}

/// Restart a single agent
async fn restart_agent(&mut self, agent_id: &AgentPid) -> SupervisionResult<()> {
if self.escalated {
return Err(SupervisionError::MaxRestartsExceeded(
self.config.supervisor_id.clone(),
));
}
log::info!("Restarting agent {}", agent_id);

// Get agent spec and current restart count
Expand Down Expand Up @@ -399,6 +420,11 @@ impl AgentSupervisor {

/// Restart all agents
async fn restart_all_agents(&mut self) -> SupervisionResult<()> {
if self.escalated {
return Err(SupervisionError::MaxRestartsExceeded(
self.config.supervisor_id.clone(),
));
}
log::info!("Restarting all agents");

let agent_specs: Vec<AgentSpec> = {
Expand Down Expand Up @@ -431,6 +457,11 @@ impl AgentSupervisor {

/// Restart agents from a specific point
async fn restart_from_agent(&mut self, failed_agent_id: &AgentPid) -> SupervisionResult<()> {
if self.escalated {
return Err(SupervisionError::MaxRestartsExceeded(
self.config.supervisor_id.clone(),
));
}
log::info!("Restarting from agent {}", failed_agent_id);

// Get all agent specs in order
Expand Down Expand Up @@ -544,6 +575,12 @@ impl AgentSupervisor {
pub async fn get_restart_history(&self) -> Vec<RestartEntry> {
self.restart_history.lock().await.clone()
}

/// Check whether the supervisor has entered an escalated state.
/// Once escalated, no further restart actions are permitted.
pub fn is_escalated(&self) -> bool {
self.escalated
}
}

#[cfg(test)]
Expand Down Expand Up @@ -626,4 +663,118 @@ mod tests {

supervisor.stop().await.unwrap();
}

#[tokio::test]
async fn test_escalation_after_max_restarts() {
let mut config = SupervisorConfig::default();
config.restart_policy.strategy = RestartStrategy::OneForOne;
config.restart_policy.intensity.max_restarts = 1;
config.restart_policy.intensity.time_window = Duration::from_secs(3600);

let factory = Arc::new(TestAgentFactory);
let mut supervisor = AgentSupervisor::new(config, factory);

supervisor.start().await.unwrap();

let spec = AgentSpec::new("test".to_string(), json!({}));
let agent_id = supervisor.spawn_agent(spec).await.unwrap();

// First failure — restart_count is 0, so restart is allowed
supervisor
.handle_agent_exit(
agent_id.clone(),
ExitReason::Error("first error".to_string()),
)
.await
.unwrap();
assert!(
!supervisor.is_escalated(),
"supervisor should not be escalated after first failure"
);

// Second failure — restart_count is now 1, max_restarts is 1, so escalate
let result = supervisor
.handle_agent_exit(
agent_id.clone(),
ExitReason::Error("second error".to_string()),
)
.await;
assert!(result.is_err(), "escalation should produce an error");
assert!(
supervisor.is_escalated(),
"supervisor should be escalated after max restarts exceeded"
);

supervisor.stop().await.unwrap();
}

#[tokio::test]
async fn test_no_restart_after_escalation() {
let mut config = SupervisorConfig::default();
config.restart_policy.strategy = RestartStrategy::OneForOne;
config.restart_policy.intensity.max_restarts = 1;
config.restart_policy.intensity.time_window = Duration::from_secs(3600);

let factory = Arc::new(TestAgentFactory);
let mut supervisor = AgentSupervisor::new(config, factory);

supervisor.start().await.unwrap();

let spec = AgentSpec::new("test".to_string(), json!({}));
let agent_id = supervisor.spawn_agent(spec).await.unwrap();

// Trigger escalation
supervisor
.handle_agent_exit(agent_id.clone(), ExitReason::Error("first".to_string()))
.await
.unwrap();
supervisor
.handle_agent_exit(agent_id.clone(), ExitReason::Error("second".to_string()))
.await
.expect_err("should escalate");
assert!(supervisor.is_escalated());

// Attempting to restart the same agent after escalation must fail
let restart_result = supervisor.restart_agent(&agent_id).await;
assert!(
restart_result.is_err(),
"restart_agent must fail after escalation (NoRestartAfterEscalation invariant)"
);

// Attempting to restart all agents after escalation must fail
let restart_all_result = supervisor.restart_all_agents().await;
assert!(
restart_all_result.is_err(),
"restart_all_agents must fail after escalation"
);

supervisor.stop().await.unwrap();
}

#[tokio::test]
async fn test_restart_from_agent_blocked_after_escalation() {
let mut config = SupervisorConfig::default();
config.restart_policy.strategy = RestartStrategy::RestForOne;

let factory = Arc::new(TestAgentFactory);
let mut supervisor = AgentSupervisor::new(config, factory);

supervisor.start().await.unwrap();

let spec = AgentSpec::new("test".to_string(), json!({}));
let agent_id = supervisor.spawn_agent(spec).await.unwrap();

// Manually enter escalated state to verify the guard independently
supervisor.escalated = true;
assert!(supervisor.is_escalated());

// restart_from_agent must be blocked when escalated
let result = supervisor.restart_from_agent(&agent_id).await;
assert!(
result.is_err(),
"restart_from_agent must fail after escalation"
);

supervisor.stop().await.unwrap();
}
}
Loading