Skip to content

Commit f823131

Browse files
ruan330claude
andcommitted
refactor(pool): per-connection Arc<Mutex> to unblock concurrent sessions
`SessionPool::with_connection` currently holds the pool's write lock for the entire callback duration. Because `stream_prompt` in discord.rs runs inside that callback and can take many seconds (or minutes) to drain an ACP turn, every other Discord thread is blocked from touching the pool while one session streams — even for `get_or_create` on a completely unrelated thread_id, which only needs the read lock. The fix: wrap each `AcpConnection` in `Arc<Mutex<_>>`. `with_connection` now takes only the pool's read lock long enough to clone the Arc, then locks that specific connection's mutex for the callback. The pool lock is released immediately, so: - Other sessions can still stream concurrently. - `get_or_create` on unrelated thread_ids proceeds without waiting. - Rebuilds still take the write lock briefly (correct — structural change to the HashMap). `cleanup_idle` uses a snapshot-then-probe pattern so the same rule holds on the cleanup path: clone the Arcs under the read lock, release it, then `try_lock` each connection individually. A busy connection is, by definition, not idle — `try_lock` lets us skip it without ever awaiting on a per-connection mutex while holding the pool lock. The write lock is only re-acquired if there are stale entries to remove. This addresses the P1 review comment left by the Codex bot on the original #77, which noted that awaiting `conn_arc.lock()` from inside a held pool write lock would re-introduce the very starvation this refactor is meant to eliminate. This matches the architecture discussed in #78 §2b and closes #58 (pool write lock deadlock during long-running notification loops). Supersedes #59 and #77. Scoped to just the locking change so it can be reviewed in isolation — notification-loop resilience and alive check will follow as separate PRs. No call-site changes: the `with_connection` signature is unchanged. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0588893 commit f823131

1 file changed

Lines changed: 51 additions & 21 deletions

File tree

src/acp/pool.rs

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ use crate::acp::connection::AcpConnection;
22
use crate::config::AgentConfig;
33
use anyhow::{anyhow, Result};
44
use std::collections::HashMap;
5-
use tokio::sync::RwLock;
5+
use std::sync::Arc;
6+
use tokio::sync::{Mutex, RwLock};
67
use tokio::time::Instant;
78
use tracing::{info, warn};
89

910
pub struct SessionPool {
10-
connections: RwLock<HashMap<String, AcpConnection>>,
11+
connections: RwLock<HashMap<String, Arc<Mutex<AcpConnection>>>>,
1112
config: AgentConfig,
1213
max_sessions: usize,
1314
}
@@ -22,22 +23,22 @@ impl SessionPool {
2223
}
2324

2425
pub async fn get_or_create(&self, thread_id: &str) -> Result<()> {
25-
// Check if alive connection exists
26+
// Fast path: alive connection exists — only the read lock is needed.
2627
{
2728
let conns = self.connections.read().await;
28-
if let Some(conn) = conns.get(thread_id) {
29-
if conn.alive() {
29+
if let Some(conn_arc) = conns.get(thread_id) {
30+
if conn_arc.lock().await.alive() {
3031
return Ok(());
3132
}
3233
}
3334
}
3435

35-
// Need to create or rebuild
36+
// Slow path: create or rebuild.
3637
let mut conns = self.connections.write().await;
3738

38-
// Double-check after acquiring write lock
39-
if let Some(conn) = conns.get(thread_id) {
40-
if conn.alive() {
39+
// Double-check after acquiring the write lock.
40+
if let Some(conn_arc) = conns.get(thread_id) {
41+
if conn_arc.lock().await.alive() {
4142
return Ok(());
4243
}
4344
warn!(thread_id, "stale connection, rebuilding");
@@ -64,30 +65,59 @@ impl SessionPool {
6465
conn.session_reset = true;
6566
}
6667

67-
conns.insert(thread_id.to_string(), conn);
68+
conns.insert(thread_id.to_string(), Arc::new(Mutex::new(conn)));
6869
Ok(())
6970
}
7071

71-
/// Get mutable access to a connection. Caller must have called get_or_create first.
72+
/// Run `f` against a mutable connection reference. Only this connection's
73+
/// per-session mutex is held for the callback's duration — the pool lock
74+
/// is released immediately, so concurrent sessions are not blocked.
75+
/// Caller must have called `get_or_create` first.
7276
pub async fn with_connection<F, R>(&self, thread_id: &str, f: F) -> Result<R>
7377
where
7478
F: FnOnce(&mut AcpConnection) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<R>> + Send + '_>>,
7579
{
76-
let mut conns = self.connections.write().await;
77-
let conn = conns
78-
.get_mut(thread_id)
79-
.ok_or_else(|| anyhow!("no connection for thread {thread_id}"))?;
80-
f(conn).await
80+
let conn_arc = {
81+
let conns = self.connections.read().await;
82+
conns
83+
.get(thread_id)
84+
.cloned()
85+
.ok_or_else(|| anyhow!("no connection for thread {thread_id}"))?
86+
};
87+
let mut conn = conn_arc.lock().await;
88+
f(&mut conn).await
8189
}
8290

8391
pub async fn cleanup_idle(&self, ttl_secs: u64) {
8492
let cutoff = Instant::now() - std::time::Duration::from_secs(ttl_secs);
93+
94+
// Snapshot the Arcs under the read lock, then release it before
95+
// awaiting any per-connection mutex. Otherwise a long-running
96+
// `session_prompt` would block `cleanup_idle` on the connection
97+
// mutex while it still held the pool write lock, re-introducing
98+
// exactly the starvation this refactor is meant to eliminate.
99+
let snapshot: Vec<(String, Arc<Mutex<AcpConnection>>)> = {
100+
let conns = self.connections.read().await;
101+
conns.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
102+
};
103+
104+
// Probe each connection under its own mutex. `try_lock` skips
105+
// connections that are currently in use — they are by definition
106+
// not idle, so there is nothing to clean up for them this round.
107+
let mut stale = Vec::new();
108+
for (key, conn_arc) in &snapshot {
109+
let Ok(conn) = conn_arc.try_lock() else { continue };
110+
if conn.last_active < cutoff || !conn.alive() {
111+
stale.push(key.clone());
112+
}
113+
}
114+
115+
if stale.is_empty() {
116+
return;
117+
}
118+
119+
// Only now take the pool write lock to remove the stale entries.
85120
let mut conns = self.connections.write().await;
86-
let stale: Vec<String> = conns
87-
.iter()
88-
.filter(|(_, c)| c.last_active < cutoff || !c.alive())
89-
.map(|(k, _)| k.clone())
90-
.collect();
91121
for key in stale {
92122
info!(thread_id = %key, "cleaning up idle session");
93123
conns.remove(&key);

0 commit comments

Comments
 (0)