Skip to content

Commit a6a37e5

Browse files
committed
stream: fix share consumer premature termination on concurrent pull
When multiple share consumers call `next()` concurrently, they all await the same `#pullFromSource()` call. When it completes, only one item is added to the buffer, so consumers whose cursor doesn't find data would incorrectly return done:true even though the source still has data. Replace the linear fall-through in the consumer `next()` method with a retry loop. Consumers that wake without data at their cursor now re-pull from source. The existing `#pulling` guard ensures only one actual source pull is in flight at a time; other consumers enqueue on `#pullWaiters` at the cost of a single promise allocation.
1 parent 8811c6b commit a6a37e5

File tree

2 files changed

+63
-51
lines changed

2 files changed

+63
-51
lines changed

lib/internal/streams/iter/share.js

Lines changed: 36 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -115,63 +115,48 @@ class ShareImpl {
115115
throw self.#sourceError;
116116
}
117117

118-
if (state.detached) {
119-
return { __proto__: null, done: true, value: undefined };
120-
}
121-
122-
if (self.#cancelled) {
123-
state.detached = true;
124-
self.#consumers.delete(state);
125-
return { __proto__: null, done: true, value: undefined };
126-
}
127-
128-
// Check if data is available in buffer
129-
const bufferIndex = state.cursor - self.#bufferStart;
130-
if (bufferIndex < self.#buffer.length) {
131-
const chunk = self.#buffer.get(bufferIndex);
132-
state.cursor++;
133-
self.#tryTrimBuffer();
134-
return { __proto__: null, done: false, value: chunk };
135-
}
136-
137-
if (self.#sourceExhausted) {
138-
state.detached = true;
139-
self.#consumers.delete(state);
140-
return { __proto__: null, done: true, value: undefined };
141-
}
118+
// Loop until we get data, source is exhausted, or
119+
// consumer is detached. Multiple consumers may be woken
120+
// after a single pull - those that find no data at their
121+
// cursor must re-pull rather than terminating prematurely.
122+
for (;;) {
123+
if (state.detached) {
124+
return { __proto__: null, done: true, value: undefined };
125+
}
142126

143-
// Need to pull from source - check buffer limit
144-
const canPull = await self.#waitForBufferSpace(state);
145-
if (!canPull) {
146-
state.detached = true;
147-
self.#consumers.delete(state);
148-
if (self.#sourceError) throw self.#sourceError;
149-
return { __proto__: null, done: true, value: undefined };
150-
}
127+
if (self.#cancelled) {
128+
state.detached = true;
129+
self.#consumers.delete(state);
130+
return { __proto__: null, done: true, value: undefined };
131+
}
151132

152-
await self.#pullFromSource();
133+
// Check if data is available in buffer
134+
const bufferIndex = state.cursor - self.#bufferStart;
135+
if (bufferIndex < self.#buffer.length) {
136+
const chunk = self.#buffer.get(bufferIndex);
137+
state.cursor++;
138+
self.#tryTrimBuffer();
139+
return { __proto__: null, done: false, value: chunk };
140+
}
153141

154-
if (self.#sourceError) {
155-
state.detached = true;
156-
self.#consumers.delete(state);
157-
throw self.#sourceError;
158-
}
142+
if (self.#sourceExhausted) {
143+
state.detached = true;
144+
self.#consumers.delete(state);
145+
if (self.#sourceError) throw self.#sourceError;
146+
return { __proto__: null, done: true, value: undefined };
147+
}
159148

160-
const newBufferIndex = state.cursor - self.#bufferStart;
161-
if (newBufferIndex < self.#buffer.length) {
162-
const chunk = self.#buffer.get(newBufferIndex);
163-
state.cursor++;
164-
self.#tryTrimBuffer();
165-
return { __proto__: null, done: false, value: chunk };
166-
}
149+
// Need to pull from source - check buffer limit
150+
const canPull = await self.#waitForBufferSpace(state);
151+
if (!canPull) {
152+
state.detached = true;
153+
self.#consumers.delete(state);
154+
if (self.#sourceError) throw self.#sourceError;
155+
return { __proto__: null, done: true, value: undefined };
156+
}
167157

168-
if (self.#sourceExhausted) {
169-
state.detached = true;
170-
self.#consumers.delete(state);
171-
return { __proto__: null, done: true, value: undefined };
158+
await self.#pullFromSource();
172159
}
173-
174-
return { __proto__: null, done: true, value: undefined };
175160
},
176161

177162
async return() {

test/parallel/test-stream-iter-share.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,4 +238,31 @@ Promise.all([
238238
testShareSyncCancel(),
239239
testSyncShareFromSync(),
240240
testSyncShareFromRejectsNonStreamable(),
241+
testShareMultipleConsumersConcurrentPull(),
241242
]).then(common.mustCall());
243+
244+
async function testShareMultipleConsumersConcurrentPull() {
245+
// Regression test: multiple consumers pulling concurrently should each
246+
// receive all items even when only one item is pulled from source at a time.
247+
// Previously, consumers woken after a pull that found no data at their
248+
// cursor would return done:true prematurely (thundering herd bug).
249+
async function* slowSource() {
250+
for (let i = 0; i < 5; i++) {
251+
await new Promise((r) => setTimeout(r, 1));
252+
yield [new TextEncoder().encode(`item-${i}`)];
253+
}
254+
}
255+
const shared = share(slowSource());
256+
const c1 = shared.pull();
257+
const c2 = shared.pull();
258+
const c3 = shared.pull();
259+
260+
const [t1, t2, t3] = await Promise.all([
261+
text(c1), text(c2), text(c3),
262+
]);
263+
264+
const expected = 'item-0item-1item-2item-3item-4';
265+
assert.strictEqual(t1, expected);
266+
assert.strictEqual(t2, expected);
267+
assert.strictEqual(t3, expected);
268+
}

0 commit comments

Comments
 (0)