fix: avoid blocking unrelated sessions during pool initialization (#256)#257
fix: avoid blocking unrelated sessions during pool initialization (#256)#257clsung wants to merge 4 commits intoopenabdev:mainfrom
Conversation
…enabdev#256) Refactor SessionPool to move expensive AcpConnection initialization outside of the write lock. This ensures that a single slow or stuck session startup does not block access to other active sessions in the pool. Changes: - Wrap AcpConnection in Arc<Mutex<...>> for granular per-session locking. - Switch SessionPool to use read locks for connection retrieval. - Perform connection spawning and initialization before acquiring the pool's write lock. - Use a fast-path read lock to check session health before attempting a rebuild.
32087ea to
019a125
Compare
|
Rebased this branch onto current What changed:
Validation:
This PR now targets the lock-scope bug from |
masami-agent
left a comment
There was a problem hiding this comment.
Code Review — PR #257
Thanks for the thorough refactor, @clsung. This addresses a real p1 issue — the current pool holds a write lock across AcpConnection::spawn() + initialize() + session_new(), which are all async and can take seconds. During that time, every other thread is blocked.
Architecture change
| Before | After | |
|---|---|---|
| Connection storage | HashMap<String, AcpConnection> |
HashMap<String, Arc<Mutex<AcpConnection>>> |
| Pool lock during spawn | Write lock held across spawn + init + session_new | Read lock for snapshot, no lock during spawn, write lock only for final insert |
with_connection |
Write lock on pool state | Read lock on pool state + per-connection mutex |
| Eviction | Under write lock, direct access | Snapshot + try_lock + remove_if_same_handle |
| Cleanup | Under write lock | Snapshot + try_lock + deferred removal |
This is a significant improvement. The key insight — moving expensive async work outside the pool lock — is correct and well-executed.
✅ What looks good
-
remove_if_same_handle— Smart use ofArc::ptr_eqto prevent ABA problems. If another task replaced the connection between snapshot and removal, the stale handle is not removed. Well-tested with two unit tests. -
with_connectionnow uses read lock — This is the biggest win. Previously, a streaming prompt held a write lock on the entire pool. Now it only holds a per-connection mutex, so other threads are unblocked. -
cleanup_idleusestry_lock— Busy connections are skipped instead of blocking the cleanup sweep. Correct behavior: a busy session is by definition not idle. -
Race condition handling in
get_or_create— After spawning outside the lock, re-checks if another task already created a healthy connection for the same thread. Good. -
HRTB fix on
with_connection—for<'a> FnOnce(&'a mut AcpConnection)is the correct lifetime bound for the new pattern where the mutex guard (not the pool state) owns the borrow.
🔴 Must fix before merge
1. Eviction can fail silently, leaving pool permanently full
if state.active.len() >= self.max_sessions {
if let Some((key, expected_conn, _, sid)) = eviction_candidate {
if remove_if_same_handle(&mut state.active, &key, &expected_conn).is_some() {
// ...
}
}
}
if state.active.len() >= self.max_sessions {
return Err(anyhow!("pool exhausted ({} sessions)"));
}If eviction_candidate is None (all other connections were locked via try_lock) or remove_if_same_handle returns None (connection was replaced), the pool stays full and returns an error. This is correct as a safety net, but the original code would always find an oldest entry because it had direct access under the write lock.
In the new design, under high concurrency, try_lock could fail on all connections, making eviction impossible even though idle sessions exist. Consider falling back to lock().await on the oldest candidate if try_lock fails on all entries, or at least logging a warning when eviction fails due to lock contention.
2. saved_session_id from suspended map is read under read lock but removed under write lock later — potential double-resume
let (existing, saved_session_id) = {
let state = self.state.read().await;
(
state.active.get(thread_id).cloned(),
state.suspended.get(thread_id).cloned(), // read, not removed
)
};The suspended session ID is read (cloned) but not removed from the map. Later, state.suspended.remove(thread_id) happens under the write lock. But between the read and the write, another task for the same thread could also read the same suspended ID and attempt session_load with it. Two connections could try to resume the same session.
This is unlikely (same thread_id racing) but possible if a user sends two messages rapidly in the same thread. Consider either:
- Accepting this as a known edge case (session_load is idempotent-ish)
- Or adding a comment documenting the race window
🟡 Non-blocking
3. had_existing flag for session_reset
let had_existing = existing.is_some();
// ...
if !resumed {
new_conn.session_new(&self.config.working_dir).await?;
if had_existing || saved_session_id.is_some() {
new_conn.session_reset = true;
}
}The original code only set session_reset when saved_session_id.is_some(). This PR adds had_existing as an additional condition. This is a behavioral change — if a stale connection existed but had no suspended session, session_reset is now set. Is this intentional? If so, a comment explaining why would help.
4. suspend_entry function removed
The suspend_entry helper is removed and its logic is inlined in multiple places. This is fine for now, but if more callers need suspension logic, consider extracting a helper again.
5. Connection drop timing
In the old code, suspend_entry explicitly drops the connection (triggering process group kill) inside the write lock. In the new code, the old Arc<Mutex<AcpConnection>> is dropped when remove_if_same_handle returns it and it goes out of scope. This should be fine since the Arc refcount will reach zero, but worth verifying that no other clone holds a reference that would delay the drop.
📝 Summary
| Priority | Item | Status |
|---|---|---|
| 🔴 | Eviction can fail silently under lock contention | Needs fix or documented fallback |
| 🔴 | Potential double-resume race on suspended session | Needs comment or fix |
| 🟡 | had_existing behavioral change |
Needs clarification |
| 🟡 | suspend_entry inlined |
Non-blocking |
| 🟡 | Connection drop timing | Non-blocking |
The core design is sound and addresses the p1 issue well. The two 🔴 items are edge cases but worth addressing before merge given this is the session pool — the most critical concurrency component in the system.
Looking forward to the next revision!
|
Addressed the review follow-up on the latest revision:
Validation:
Latest commit: |
masami-agent
left a comment
There was a problem hiding this comment.
Re-review — all items addressed ✅
@clsung addressed all three review items cleanly. Verified the full diff (not just the items I raised).
🔴 #1 — Eviction under contention
Kept pool exhausted as the safe fallback (correct — better to reject than corrupt state). Added two explicit warning logs:
"pool full but eviction candidate changed before removal"— ABA case"pool full but all other sessions were busy during eviction scan"— alltry_lockfailed
Operators can now diagnose eviction failures from logs. Acceptable approach.
🔴 #2 — Double-resume race
Added a per-thread creation gate (creating: HashMap<String, Arc<Mutex<()>>>). The _create_guard at the top of get_or_create serializes same-thread calls, completely eliminating the race window. The write lock for get_or_insert_gate is brief (just a HashMap insert) and released before any heavy async work. Clean solution.
Unit test get_or_insert_gate_reuses_gate_for_same_thread confirms gate reuse via Arc::ptr_eq.
🟡 #3 — session_reset behavioral change
Inline comment added explaining the intent — stale live entries without a resumable session ID still warrant the reset banner. Makes sense.
Full diff verification
- ✅ Only
src/acp/pool.rschanged — no Cargo.toml/lock version issues - ✅ Lock ordering is correct: write lock for gate insert → release → per-thread gate → read lock for snapshot → no lock during spawn/init → write lock for final insert
- ✅
cleanup_idleandwith_connectionpatterns are consistent withget_or_create - ✅ 3 unit tests covering
remove_if_same_handle(match + mismatch) andget_or_insert_gate - ✅
cargo test: 43 passed, 0 failed
Recommend squash merge.
|
@clsung Two items to fix before this can merge: 1. CI fail — clippy Simplest fix: add a type alias, e.g. type EvictionCandidate = (String, Arc<Mutex<AcpConnection>>, Instant, Option<String>);then use 2. Version regression — Cargo.toml |
|
Addressed the two latest maintainer follow-ups:
Validation:
Latest commit: |
Refactor SessionPool to move expensive AcpConnection initialization outside of the write lock. This ensures that a single slow or stuck session startup does not block access to other active sessions in the pool.
Changes:
Discord Discussion URL: https://discord.com/channels/1491295327620169908/1491365150664560881/1493769615984164975