@@ -14,6 +14,13 @@ type OnceWaiter = {
1414 resolve : ( result : InputStreamOnceResult < unknown > ) => void ;
1515 reject : ( error : Error ) => void ;
1616 timeoutHandle ?: ReturnType < typeof setTimeout > ;
17+ // The abort signal and its handler are tracked on the waiter so any
18+ // resolution path (dispatch / timeout / explicit removal) can detach
19+ // the listener. Without this, a long-lived `AbortSignal` reused across
20+ // many `once()` calls accumulates listeners — `{ once: true }` only
21+ // self-clears if the signal actually aborts.
22+ signal ?: AbortSignal ;
23+ abortHandler ?: ( ) => void ;
1724} ;
1825
1926
@@ -36,6 +43,13 @@ export class StandardInputStreamManager implements InputStreamManager {
3643 // reconnect in a tight loop. Reset to 0 by `#dispatch` whenever a
3744 // record flows through.
3845 private reconnectAttempts = new Map < string , number > ( ) ;
46+ // Stream IDs that were explicitly torn down by `disconnectStream`. The
47+ // tail's `.finally` reconnect path consults this set so a deliberate
48+ // teardown isn't immediately undone by the auto-reconnect when
49+ // handlers or once-waiters are still registered. Cleared on the next
50+ // explicit `on()` / `once()` (those are the only legitimate reasons to
51+ // bring the tail back up).
52+ private explicitlyDisconnected = new Set < string > ( ) ;
3953
4054 constructor (
4155 private apiClient : ApiClient ,
@@ -75,6 +89,10 @@ export class StandardInputStreamManager implements InputStreamManager {
7589 on ( streamId : string , handler : InputStreamHandler ) : { off : ( ) => void } {
7690 this . #requireV2Streams( ) ;
7791
92+ // A fresh attach is a legitimate reason to bring the tail back up;
93+ // clear any prior explicit-disconnect flag.
94+ this . explicitlyDisconnected . delete ( streamId ) ;
95+
7896 let handlerSet = this . handlers . get ( streamId ) ;
7997 if ( ! handlerSet ) {
8098 handlerSet = new Set ( ) ;
@@ -107,6 +125,10 @@ export class StandardInputStreamManager implements InputStreamManager {
107125 once ( streamId : string , options ?: InputStreamOnceOptions ) : InputStreamOncePromise < unknown > {
108126 this . #requireV2Streams( ) ;
109127
128+ // A fresh waiter is a legitimate reason to bring the tail back up;
129+ // clear any prior explicit-disconnect flag.
130+ this . explicitlyDisconnected . delete ( streamId ) ;
131+
110132 // Lazily connect a tail for this stream
111133 this . #ensureStreamTailConnected( streamId ) ;
112134
@@ -131,17 +153,16 @@ export class StandardInputStreamManager implements InputStreamManager {
131153 reject ( new Error ( "Aborted" ) ) ;
132154 return ;
133155 }
134- options . signal . addEventListener (
135- "abort" ,
136- ( ) => {
137- if ( waiter . timeoutHandle ) {
138- clearTimeout ( waiter . timeoutHandle ) ;
139- }
140- this . #removeOnceWaiter( streamId , waiter ) ;
141- reject ( new Error ( "Aborted" ) ) ;
142- } ,
143- { once : true }
144- ) ;
156+ const abortHandler = ( ) => {
157+ if ( waiter . timeoutHandle ) {
158+ clearTimeout ( waiter . timeoutHandle ) ;
159+ }
160+ this . #removeOnceWaiter( streamId , waiter ) ;
161+ reject ( new Error ( "Aborted" ) ) ;
162+ } ;
163+ waiter . signal = options . signal ;
164+ waiter . abortHandler = abortHandler ;
165+ options . signal . addEventListener ( "abort" , abortHandler , { once : true } ) ;
145166 }
146167
147168 // Handle timeout — resolve with error result instead of rejecting
@@ -186,6 +207,14 @@ export class StandardInputStreamManager implements InputStreamManager {
186207 }
187208
188209 disconnectStream ( streamId : string ) : void {
210+ // Mark as explicitly disconnected BEFORE we abort, so the tail's
211+ // `.finally` reconnect path sees the flag when it runs (which can be
212+ // synchronous in the AbortError catch). Without this, an in-flight
213+ // `.on(...)` or pending `.once()` would immediately resurrect the
214+ // tail and negate the disconnect — defeating the
215+ // "drop-the-duplicate before .wait() suspends" contract. Cleared on
216+ // the next explicit `on()` / `once()`.
217+ this . explicitlyDisconnected . add ( streamId ) ;
189218 const tail = this . tails . get ( streamId ) ;
190219 if ( tail ) {
191220 tail . abortController . abort ( ) ;
@@ -225,13 +254,17 @@ export class StandardInputStreamManager implements InputStreamManager {
225254 this . seqNums . clear ( ) ;
226255 this . handlers . clear ( ) ;
227256 this . reconnectAttempts . clear ( ) ;
257+ this . explicitlyDisconnected . clear ( ) ;
228258
229259 // Reject all pending once waiters
230260 for ( const [ , waiters ] of this . onceWaiters ) {
231261 for ( const waiter of waiters ) {
232262 if ( waiter . timeoutHandle ) {
233263 clearTimeout ( waiter . timeoutHandle ) ;
234264 }
265+ if ( waiter . signal && waiter . abortHandler ) {
266+ waiter . signal . removeEventListener ( "abort" , waiter . abortHandler ) ;
267+ }
235268 waiter . reject ( new Error ( "Input stream manager reset" ) ) ;
236269 }
237270 }
@@ -259,6 +292,13 @@ export class StandardInputStreamManager implements InputStreamManager {
259292 . finally ( ( ) => {
260293 this . tails . delete ( streamId ) ;
261294
295+ // If the tail was torn down explicitly via `disconnectStream`,
296+ // don't auto-reconnect — that's the whole point of the
297+ // disconnect call. The next `on()` / `once()` clears the flag.
298+ if ( this . explicitlyDisconnected . has ( streamId ) ) {
299+ return ;
300+ }
301+
262302 // Auto-reconnect with exponential backoff if there are still
263303 // active handlers or waiters. Without backoff a persistent
264304 // failure (auth rejected, 5xx, DNS) would reconnect in a tight
@@ -273,6 +313,7 @@ export class StandardInputStreamManager implements InputStreamManager {
273313 this . reconnectAttempts . set ( streamId , attempt + 1 ) ;
274314 const delayMs = computeReconnectDelayMs ( attempt ) ;
275315 setTimeout ( ( ) => {
316+ if ( this . explicitlyDisconnected . has ( streamId ) ) return ;
276317 if ( this . tails . has ( streamId ) ) return ;
277318 const stillHasHandlers =
278319 this . handlers . has ( streamId ) && this . handlers . get ( streamId ) ! . size > 0 ;
@@ -361,6 +402,9 @@ export class StandardInputStreamManager implements InputStreamManager {
361402 if ( waiter . timeoutHandle ) {
362403 clearTimeout ( waiter . timeoutHandle ) ;
363404 }
405+ if ( waiter . signal && waiter . abortHandler ) {
406+ waiter . signal . removeEventListener ( "abort" , waiter . abortHandler ) ;
407+ }
364408 waiter . resolve ( { ok : true , output : data } ) ;
365409 // Also invoke persistent handlers
366410 this . #invokeHandlers( streamId , data ) ;
@@ -410,6 +454,13 @@ export class StandardInputStreamManager implements InputStreamManager {
410454 }
411455
412456 #removeOnceWaiter( streamId : string , waiter : OnceWaiter ) : void {
457+ // Centralized cleanup — both timeout and explicit abort paths funnel
458+ // through here, so detach the abort listener once instead of at every
459+ // callsite. The dispatch path doesn't go through this method (the
460+ // waiter is shifted off inline), so it detaches the listener there.
461+ if ( waiter . signal && waiter . abortHandler ) {
462+ waiter . signal . removeEventListener ( "abort" , waiter . abortHandler ) ;
463+ }
413464 const waiters = this . onceWaiters . get ( streamId ) ;
414465 if ( ! waiters ) return ;
415466 const index = waiters . indexOf ( waiter ) ;
0 commit comments