Skip to content
Merged
35 changes: 34 additions & 1 deletion src/core/jsonrpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,42 @@ async fn wait_until_port_released(port: u16) {
}
}

#[tokio::test]
/// Regression test for issue #920 — the embedded server's `axum::serve`
/// accept loop must stop within the cancellation timeout when its
/// `CancellationToken` is fired.
///
/// **Ignored by default.** This test calls `run_server_embedded`,
/// which triggers the full production bootstrap (`bootstrap_skill_runtime`
/// → `register_domain_subscribers` → `scheduler_gate::init_global` +
/// `memory::tree::jobs::start` + `composio::start_periodic_sync` +
/// cron scheduler). Those code paths spawn detached `tokio::spawn`
/// background tasks and write to several process-global statics
/// (`STATE: OnceLock`, `SIGNED_OUT: AtomicBool`, `LLM_PERMITS`
/// semaphore, `GLOBAL_REGISTRY` agent.run_turn handler, `STARTED`
/// `std::sync::Once`s, …) — *none of which have teardown semantics*.
/// In a unit-test binary the leaked tasks then race with every other
/// test, multiplying CI wall time by 10–20× (PR #1552 thread). The
/// right shape for this regression is an integration test in a
/// dedicated `tests/` binary where global pollution doesn't affect
/// siblings — tracked as a follow-up.
///
/// To run manually: `cargo test --lib -p openhuman -- --ignored
/// shutdown_token`.
#[tokio::test]
#[ignore = "calls full server bootstrap; leaks process-global state into sibling tests (#1552). Re-cover via integration test."]
async fn shutdown_token_stops_axum_listener_within_timeout() {
let _signed_out_restore = crate::openhuman::scheduler_gate::SignedOutTestGuard::set(false);

let workspace = tempfile::tempdir().expect("workspace tempdir");

// Pin scheduler-gate policy to Aggressive while this test runs so
// the bootstrap's `init_global` snapshot can't capture transient
// CPU pressure and freeze the cached policy at Paused.
std::fs::write(
workspace.path().join("config.toml"),
"[scheduler_gate]\nmode = \"always_on\"\n",
)
.expect("seed scheduler_gate=always_on config.toml");
let _env = EnvVarGuard::set_many(vec![
(
"OPENHUMAN_WORKSPACE",
Expand Down
210 changes: 174 additions & 36 deletions src/openhuman/scheduler_gate/gate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,16 @@ pub fn update_config(cfg: SchedulerGateConfig) {
/// (e.g. in unit tests) so callers don't deadlock waiting on a sampler that
/// will never start.
///
/// When the signed-out override is set, returns [`Policy::Paused`] with
/// [`PauseReason::SignedOut`] unconditionally — this is the top-priority
/// "host should do no LLM work" signal and ignores config / signals.
/// When the signed-out override is set **and the gate has been initialised**,
/// returns [`Policy::Paused`] with [`PauseReason::SignedOut`] — this is the
/// top-priority "host should do no LLM work" signal and ignores config /
/// signals. We gate on [`STATE`] being present because the override only has
/// a meaningful effect when there are real background workers calling into
/// the gate; in unit tests where `init_global` was never called, a stale
/// `signed_out` flag from an earlier test can otherwise deadlock every
/// subsequent caller (see `wait_for_capacity` for the deadlock path).
pub fn current_policy() -> Policy {
if is_signed_out() {
if STATE.get().is_some() && is_signed_out() {
return Policy::Paused {
reason: PauseReason::SignedOut,
};
Expand Down Expand Up @@ -287,8 +292,22 @@ pub fn is_signed_out() -> bool {
/// Toggle the signed-out override. Set to `true` from `clear_session`
/// and 401-detection sites; set to `false` from `store_session` once a
/// fresh JWT has been written. Idempotent.
///
/// Gated on [`STATE`] being initialised: if the scheduler gate hasn't
/// been started (every unit-test binary, plus the brief pre-`init_global`
/// window during bootstrap), this is a no-op. There are no background
/// workers to stand down in that state, and unconditionally flipping the
/// process-global atomic lets test paths like `clear_session` and
/// `SessionExpiredSubscriber.handle()` leak `true` into subsequent tests
/// that — if anything later promotes [`STATE`] to `Some` — will spin
/// forever in the `paused_poll_ms` branch of [`wait_for_capacity`].
/// Gating at the writer is a belt-and-braces companion to the reader-side
/// guard added in PR #1552.
#[cfg(not(test))]
pub fn set_signed_out(signed_out: bool) {
if STATE.get().is_none() {
return;
}
let prev = SIGNED_OUT.swap(signed_out, Ordering::AcqRel);
if prev != signed_out {
log::info!("[scheduler_gate] signed_out {} -> {}", prev, signed_out);
Expand All @@ -297,6 +316,9 @@ pub fn set_signed_out(signed_out: bool) {

#[cfg(test)]
pub fn set_signed_out(signed_out: bool) {
if STATE.get().is_none() {
return;
}
let Some(id) = test_state::current_id() else {
return;
};
Expand All @@ -306,6 +328,49 @@ pub fn set_signed_out(signed_out: bool) {
}
}

/// Test-only RAII helper that snapshots the per-runtime `signed_out`
/// flag on construction, flips it to `next`, and restores the
/// snapshotted value on drop — even if the test body panics.
///
/// Use this in any test that exercises a code path that itself calls
/// [`set_signed_out`] *after* [`init_global`] has promoted [`STATE`].
/// Notably the JSON-RPC server bootstrap (`run_server_embedded` →
/// `bootstrap_skill_runtime` → `register_domain_subscribers`) flips
/// the flag to `true` whenever the workspace has no stored session
/// token, which is the common case for tests using a fresh
/// `tempfile::tempdir()` workspace.
///
/// Bypasses the writer-side gate at [`set_signed_out`] (which no-ops
/// only when `STATE` is `None`) so it works regardless of whether
/// `init_global` has run.
#[cfg(test)]
pub(crate) struct SignedOutTestGuard(Option<(tokio::runtime::Id, bool)>);

#[cfg(test)]
impl SignedOutTestGuard {
/// Snapshot the per-runtime `signed_out` flag, write `next`, and
/// return a guard that restores the snapshotted value on drop.
/// No-op outside a tokio runtime.
pub(crate) fn set(next: bool) -> Self {
match test_state::current_id() {
Some(id) => {
let prev = test_state::set_signed_out_for(id, next);
Self(Some((id, prev)))
}
None => Self(None),
}
}
}

#[cfg(test)]
impl Drop for SignedOutTestGuard {
fn drop(&mut self) {
if let Some((id, prev)) = self.0 {
test_state::set_signed_out_for(id, prev);
}
}
}

/// Most recent sampled signals, or a neutral default if the sampler hasn't run.
pub fn current_signals() -> Signals {
STATE.get().map(|s| s.read().signals).unwrap_or(Signals {
Expand Down Expand Up @@ -343,7 +408,19 @@ pub async fn wait_for_capacity() -> Option<LlmPermit> {
// cadence as the rest of the Paused arm. Holding here (rather than
// returning) means workers naturally resume the instant the user
// signs back in — no respawn dance, no missed wakeups.
if is_signed_out() {
//
// We gate on `STATE.get().is_some()` so the override only fires once
// the gate has been initialised by `init_global`. In unit tests
// where `init_global` was never called there is no background-worker
// pool to stand down, but the per-runtime `signed_out` flag can
// still be `true` from an earlier test that exercised the credentials
// / 401 paths (`clear_session`, RPC 401 dispatch, or
// `SessionExpiredSubscriber.handle()`). Without the gate, every
// subsequent caller of `wait_for_capacity` polls forever on the
// 60-second fallback cadence — manifest as the
// `openhuman::agent::triage::evaluator::tests::*` hangs reported
// after #1516.
if STATE.get().is_some() && is_signed_out() {
let paused_ms = STATE
.get()
.map(|s| s.read().cfg.paused_poll_ms)
Expand Down Expand Up @@ -500,52 +577,113 @@ mod tests {
drop(second);
}

// `SignedOutTestGuard` lives at module scope (above) so cross-module
// tests (e.g. `core::jsonrpc::tests::shutdown_token_*`) can use it
// too. The local re-import keeps the existing tests below readable
// without fully-qualified paths.
use super::SignedOutTestGuard;

/// Bail out if a cross-module test in the same lib-test binary has
/// already promoted [`STATE`] to `Some` via `init_global` (notably
/// `core::jsonrpc::tests::shutdown_token_*`, which boots the embedded
/// server). `STATE` is an `OnceLock` with no reset, so these
/// `*_when_gate_uninit` regression tests are inherently order-sensitive
/// — they only have meaning when `STATE.is_none()`. Skipping when
/// `STATE.is_some()` avoids a false failure here; the actual leak
/// class the test exists to guard against is still covered by
/// the writer-side `set_signed_out` gate plus the reader-side
/// `wait_for_capacity` guard in production code paths.
fn skip_if_gate_initialised(test_name: &str) -> bool {
if STATE.get().is_some() {
eprintln!(
"[scheduler_gate::tests] skipping {test_name}: STATE already \
initialised by an earlier test in this binary"
);
true
} else {
false
}
}

#[tokio::test]
async fn signed_out_override_pauses_policy_regardless_of_signals() {
async fn signed_out_is_ignored_when_gate_uninit() {
// In unit tests `init_global` is never called, so `STATE` is `None`.
// In that state the signed-out override is intentionally inert: there
// are no background workers to stand down, and honouring the per-runtime
// flag would let any earlier test that set it (`clear_session`, RPC 401
// dispatch, `SessionExpiredSubscriber`) deadlock every subsequent caller
// of `wait_for_capacity`.
let _g = lock();
// Make sure we start clean: another test may have left the flag on.
set_signed_out(false);
assert!(!is_signed_out(), "precondition: not signed out");
if skip_if_gate_initialised("signed_out_is_ignored_when_gate_uninit") {
return;
}
let _signed_out = SignedOutTestGuard::set(true);

set_signed_out(true);
assert_eq!(
current_policy(),
Policy::Paused {
reason: PauseReason::SignedOut
},
"signed_out override must trump init_global state"
Policy::Normal,
"with STATE uninit, signed_out must NOT change current_policy"
);
}

set_signed_out(false);
assert!(!is_signed_out(), "override must be releasable");
#[tokio::test]
async fn wait_for_capacity_acquires_immediately_when_signed_out_and_uninit() {
// Regression test for the
// `openhuman::agent::triage::evaluator::tests::*` hangs that surfaced
// after #1516 added the `signed_out` override. Earlier tests in the
// same `cargo test` binary that exercise `clear_session` /
// `SessionExpiredSubscriber` / the RPC 401 path can leave the
// per-runtime flag set to `true`. Without the `STATE.is_some()`
// gate, every subsequent `wait_for_capacity()` polls forever on the
// 60-second `paused_poll_ms` fallback (STATE is None in tests, so
// the fallback is the unconfigured default).
let _g = lock();
if skip_if_gate_initialised(
"wait_for_capacity_acquires_immediately_when_signed_out_and_uninit",
) {
return;
}
let _signed_out = SignedOutTestGuard::set(true);

let permit = timeout(TokioDuration::from_millis(500), wait_for_capacity())
.await
.expect("wait_for_capacity must NOT block when STATE is uninit, even if signed_out")
.expect("uninit gate still hands back a permit");
drop(permit);
}

#[tokio::test]
async fn signed_out_makes_wait_for_capacity_block_briefly() {
// We can't easily prove "it polls forever" without invasive setup
// of the poll-interval state, so we just confirm it doesn't hand
// back a permit synchronously while the override is on, then
// releases as soon as it's cleared.
async fn set_signed_out_is_a_noop_when_gate_uninit() {
// Writer-side companion to `signed_out_is_ignored_when_gate_uninit`.
// The production `set_signed_out` must NOT mutate the per-runtime flag
// when `STATE` is uninit, otherwise a `clear_session` call exercised
// in one test leaks `signed_out=true` into every subsequent test in
// the binary. With this gate, only callers that run after `init_global`
// (i.e. real workers in production) ever flip the bit.
//
// Note: because this is a `#[tokio::test]`, a runtime is always
// present, so the `current_id().is_none()` branch in the test-cfg
// implementations of `set_signed_out` and `is_signed_out` is
// unreachable here. The gate we exercise is exclusively the
// `STATE.get().is_none()` early-return.
let _g = lock();
set_signed_out(true);
if skip_if_gate_initialised("set_signed_out_is_a_noop_when_gate_uninit") {
return;
}
// Force the atomic to a known-clean state via the test backdoor.
let _restore = SignedOutTestGuard::set(false);

let handle = tokio::spawn(async { wait_for_capacity().await });
tokio::time::sleep(TokioDuration::from_millis(40)).await;
set_signed_out(true);
assert!(
!handle.is_finished(),
"wait_for_capacity must block while signed out"
!is_signed_out(),
"set_signed_out(true) must no-op when STATE is None"
);

set_signed_out(false);
// Best-effort timeout to keep CI fast if the default poll interval
// is long (init_global isn't run in tests so STATE is None and the
// fallback is 60s) — abort and treat as pass on timeout.
if let Ok(joined) = timeout(TokioDuration::from_millis(200), handle).await {
let permit = joined.expect("join ok");
assert!(permit.is_some(), "permit returned after override cleared");
drop(permit);
}
// Ensure we leave the override clean for any later test.
set_signed_out(false);
assert!(
!is_signed_out(),
"set_signed_out(false) must no-op when STATE is None"
);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

#[tokio::test]
Expand Down
3 changes: 3 additions & 0 deletions src/openhuman/scheduler_gate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ pub use gate::{
};
pub use policy::{PauseReason, Policy};
pub use signals::Signals;

#[cfg(test)]
pub(crate) use gate::SignedOutTestGuard;
Loading