Skip to content

Commit 84df861

Browse files
committed
stream: fixup stream/iter tests
1 parent 4e44ece commit 84df861

25 files changed

Lines changed: 1089 additions & 330 deletions

test/parallel/test-stream-iter-broadcast-backpressure.js

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,82 @@ async function testBlockBackpressure() {
5757
// Next write should block
5858
let writeResolved = false;
5959
const writePromise = writer.write('b').then(() => { writeResolved = true; });
60-
await new Promise((r) => setTimeout(r, 10));
60+
await new Promise(setImmediate);
6161
assert.strictEqual(writeResolved, false);
6262

63-
// Drain consumer
63+
// Drain consumer to unblock the pending write
6464
const iter = consumer[Symbol.asyncIterator]();
65-
await iter.next();
66-
await new Promise((r) => setTimeout(r, 10));
65+
const first = await iter.next();
66+
assert.strictEqual(first.done, false);
67+
await new Promise(setImmediate);
6768
assert.strictEqual(writeResolved, true);
69+
6870
writer.endSync();
71+
// Drain remaining data and verify completion
72+
const second = await iter.next();
73+
assert.strictEqual(second.done, false);
74+
await writePromise;
75+
}
76+
77+
// Verify block backpressure data flows correctly end-to-end
78+
async function testBlockBackpressureContent() {
79+
const { writer, broadcast: bc } = broadcast({
80+
highWaterMark: 1,
81+
backpressure: 'block',
82+
});
83+
const consumer = bc.push();
84+
85+
writer.writeSync('a');
86+
const writePromise = writer.write('b');
87+
await new Promise(setImmediate);
88+
89+
// Read all and verify content
90+
const iter = consumer[Symbol.asyncIterator]();
91+
const first = await iter.next();
92+
assert.strictEqual(first.done, false);
93+
const firstStr = new TextDecoder().decode(first.value[0]);
94+
assert.strictEqual(firstStr, 'a');
95+
6996
await writePromise;
97+
writer.endSync();
98+
99+
const second = await iter.next();
100+
assert.strictEqual(second.done, false);
101+
const secondStr = new TextDecoder().decode(second.value[0]);
102+
assert.strictEqual(secondStr, 'b');
103+
104+
const done = await iter.next();
105+
assert.strictEqual(done.done, true);
106+
}
107+
108+
// Writev async path
109+
async function testWritevAsync() {
110+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 10 });
111+
const consumer = bc.push();
112+
113+
await writer.writev(['hello', ' ', 'world']);
114+
await writer.end();
115+
116+
const data = await text(consumer);
117+
assert.strictEqual(data, 'hello world');
118+
}
119+
120+
// endSync returns the total byte count
121+
async function testEndSyncReturnValue() {
122+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 10 });
123+
bc.push(); // Need a consumer to write to
124+
125+
writer.writeSync('hello'); // 5 bytes
126+
writer.writeSync(' world'); // 6 bytes
127+
const total = writer.endSync();
128+
assert.strictEqual(total, 11);
70129
}
71130

72131
Promise.all([
73132
testDropOldest(),
74133
testDropNewest(),
75134
testBlockBackpressure(),
135+
testBlockBackpressureContent(),
136+
testWritevAsync(),
137+
testEndSyncReturnValue(),
76138
]).then(common.mustCall());

test/parallel/test-stream-iter-broadcast-basic.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,12 @@ async function testConsumerCount() {
5555
bc.push();
5656
assert.strictEqual(bc.consumerCount, 2);
5757

58-
// Consume c1 to completion (it returns immediately since no data has been
59-
// pushed and we haven't ended yet - but we'll cancel to detach)
6058
bc.cancel();
6159

62-
// After cancel, consumers are detached
60+
// After cancel, consumer count drops to 0
61+
assert.strictEqual(bc.consumerCount, 0);
62+
63+
// Consumers are detached and yield nothing
6364
const batches = [];
6465
for await (const batch of c1) {
6566
batches.push(batch);
@@ -103,7 +104,7 @@ async function testWriterEnd() {
103104

104105
await writer.write('data');
105106
const totalBytes = await writer.end();
106-
assert.ok(totalBytes > 0);
107+
assert.strictEqual(totalBytes, 4); // 'data' = 4 UTF-8 bytes
107108

108109
const data = await text(consumer);
109110
assert.strictEqual(data, 'data');
@@ -175,6 +176,9 @@ async function testFailDetachesConsumers() {
175176
await writer.write('data');
176177
await writer.fail(new Error('writer failed'));
177178

179+
// After fail, consumers are detached
180+
assert.strictEqual(bc.consumerCount, 0);
181+
178182
// Both consumers should see the error
179183
await assert.rejects(
180184
async () => {

test/parallel/test-stream-iter-broadcast-from.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ async function testBroadcastFromAsyncIterable() {
2020

2121
async function testBroadcastFromNonArrayChunks() {
2222
// Source that yields single Uint8Array chunks (not arrays)
23+
const enc = new TextEncoder();
2324
async function* singleChunkSource() {
24-
yield new TextEncoder().encode('hello');
25-
yield new TextEncoder().encode(' world');
25+
yield enc.encode('hello');
26+
yield enc.encode(' world');
2627
}
2728
const { broadcast: bc } = Broadcast.from(singleChunkSource());
2829
const consumer = bc.push();
@@ -98,10 +99,11 @@ async function testBroadcastFromCancelWhileBlocked() {
9899
// Create a slow async source that blocks between yields
99100
let sourceFinished = false;
100101
async function* slowSource() {
101-
yield [new TextEncoder().encode('chunk1')];
102+
const enc = new TextEncoder();
103+
yield [enc.encode('chunk1')];
102104
// Simulate a long delay - the cancel should unblock this
103105
await new Promise((resolve) => setTimeout(resolve, 10000));
104-
yield [new TextEncoder().encode('chunk2')];
106+
yield [enc.encode('chunk2')];
105107
sourceFinished = true;
106108
}
107109

test/parallel/test-stream-iter-consumers-bytes.js

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,46 +21,41 @@ const {
2121
// =============================================================================
2222

2323
async function testBytesSyncBasic() {
24-
const source = fromSync('hello');
25-
const data = bytesSync(source);
24+
const data = bytesSync(fromSync('hello'));
2625
assert.deepStrictEqual(data, new TextEncoder().encode('hello'));
2726
}
2827

2928
async function testBytesSyncLimit() {
30-
const source = fromSync('hello world');
3129
assert.throws(
32-
() => bytesSync(source, { limit: 3 }),
30+
() => bytesSync(fromSync('hello world'), { limit: 3 }),
3331
{ name: 'RangeError' },
3432
);
3533
}
3634

3735
async function testBytesAsync() {
38-
const source = from('hello-async');
39-
const data = await bytes(source);
36+
const data = await bytes(from('hello-async'));
4037
assert.deepStrictEqual(data, new TextEncoder().encode('hello-async'));
4138
}
4239

4340
async function testBytesAsyncLimit() {
44-
const source = from('hello world');
4541
await assert.rejects(
46-
() => bytes(source, { limit: 3 }),
42+
() => bytes(from('hello world'), { limit: 3 }),
4743
{ name: 'RangeError' },
4844
);
4945
}
5046

5147
async function testBytesAsyncAbort() {
5248
const ac = new AbortController();
5349
ac.abort();
54-
const source = from('data');
5550
await assert.rejects(
56-
() => bytes(source, { signal: ac.signal }),
57-
(err) => err.name === 'AbortError',
51+
() => bytes(from('data'), { signal: ac.signal }),
52+
{ name: 'AbortError' },
5853
);
5954
}
6055

6156
async function testBytesEmpty() {
62-
const source = from([]);
63-
const data = await bytes(source);
57+
const data = await bytes(from([]));
58+
assert.ok(data instanceof Uint8Array);
6459
assert.strictEqual(data.byteLength, 0);
6560
}
6661

@@ -69,17 +64,15 @@ async function testBytesEmpty() {
6964
// =============================================================================
7065

7166
async function testArrayBufferSyncBasic() {
72-
const source = fromSync(new Uint8Array([1, 2, 3]));
73-
const ab = arrayBufferSync(source);
67+
const ab = arrayBufferSync(fromSync(new Uint8Array([1, 2, 3])));
7468
assert.ok(ab instanceof ArrayBuffer);
7569
assert.strictEqual(ab.byteLength, 3);
7670
const view = new Uint8Array(ab);
7771
assert.deepStrictEqual(view, new Uint8Array([1, 2, 3]));
7872
}
7973

8074
async function testArrayBufferAsync() {
81-
const source = from(new Uint8Array([10, 20, 30]));
82-
const ab = await arrayBuffer(source);
75+
const ab = await arrayBuffer(from(new Uint8Array([10, 20, 30])));
8376
assert.ok(ab instanceof ArrayBuffer);
8477
assert.strictEqual(ab.byteLength, 3);
8578
const view = new Uint8Array(ab);
@@ -96,8 +89,7 @@ async function testArraySyncBasic() {
9689
yield new Uint8Array([2]);
9790
yield new Uint8Array([3]);
9891
}
99-
const source = fromSync(gen());
100-
const chunks = arraySync(source);
92+
const chunks = arraySync(fromSync(gen()));
10193
assert.strictEqual(chunks.length, 3);
10294
assert.deepStrictEqual(chunks[0], new Uint8Array([1]));
10395
assert.deepStrictEqual(chunks[1], new Uint8Array([2]));

test/parallel/test-stream-iter-consumers-merge.js

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@ async function testMergeTwoSources() {
3939
}
4040

4141
async function testMergeSingleSource() {
42-
const source = from('only-one');
43-
const merged = merge(source);
44-
45-
const data = await text(merged);
42+
const data = await text(merge(from('only-one')));
4643
assert.strictEqual(data, 'only-one');
4744
}
4845

@@ -59,8 +56,7 @@ async function testMergeWithAbortSignal() {
5956
const ac = new AbortController();
6057
ac.abort();
6158

62-
const source = from('data');
63-
const merged = merge(source, { signal: ac.signal });
59+
const merged = merge(from('data'), { signal: ac.signal });
6460

6561
await assert.rejects(
6662
async () => {
@@ -69,7 +65,7 @@ async function testMergeWithAbortSignal() {
6965
assert.fail('Should not reach here');
7066
}
7167
},
72-
(err) => err.name === 'AbortError',
68+
{ name: 'AbortError' },
7369
);
7470
}
7571

@@ -91,10 +87,11 @@ async function testMergeSyncSources() {
9187

9288
async function testMergeSourceError() {
9389
async function* goodSource() {
94-
yield [new TextEncoder().encode('a')];
90+
const enc = new TextEncoder();
91+
yield [enc.encode('a')];
9592
// Slow so the bad source errors first
9693
await new Promise((r) => setTimeout(r, 50));
97-
yield [new TextEncoder().encode('b')];
94+
yield [enc.encode('b')];
9895
}
9996

10097
async function* badSource() {
@@ -129,17 +126,19 @@ async function testMergeConsumerBreak() {
129126
for await (const _ of merge(source1(), source2())) {
130127
break; // Break after first batch
131128
}
132-
await new Promise((r) => setTimeout(r, 10));
133-
// At least one source should be cleaned up
134-
assert.strictEqual(source1Return || source2Return, true);
129+
// Give async cleanup a tick to complete
130+
await new Promise(setImmediate);
131+
// Both sources should be cleaned up
132+
assert.strictEqual(source1Return && source2Return, true);
135133
}
136134

137135
async function testMergeSignalMidIteration() {
138136
const ac = new AbortController();
139137
async function* slowSource() {
140-
yield [new TextEncoder().encode('a')];
138+
const enc = new TextEncoder();
139+
yield [enc.encode('a')];
141140
await new Promise((r) => setTimeout(r, 100));
142-
yield [new TextEncoder().encode('b')];
141+
yield [enc.encode('b')];
143142
}
144143
const merged = merge(slowSource(), { signal: ac.signal });
145144
const iter = merged[Symbol.asyncIterator]();

test/parallel/test-stream-iter-consumers-tap.js

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@
44
const common = require('../common');
55
const assert = require('assert');
66
const {
7+
from,
8+
fromSync,
79
pull,
10+
pullSync,
811
push,
912
tap,
1013
tapSync,
1114
text,
15+
textSync,
1216
} = require('stream/iter');
1317

1418
// =============================================================================
@@ -77,8 +81,50 @@ async function testTapInPipeline() {
7781
assert.strictEqual(seen[0], 'hello');
7882
}
7983

84+
// Tap callback error propagates through async pipeline
85+
async function testTapAsyncErrorPropagation() {
86+
const badTap = tap(() => { throw new Error('tap error'); });
87+
await assert.rejects(async () => {
88+
// eslint-disable-next-line no-unused-vars
89+
for await (const _ of pull(from('hello'), badTap)) { /* consume */ }
90+
}, { message: 'tap error' });
91+
}
92+
93+
// TapSync callback error propagates through sync pipeline
94+
function testTapSyncErrorPropagation() {
95+
const badTap = tapSync(() => { throw new Error('tapSync error'); });
96+
assert.throws(() => {
97+
// eslint-disable-next-line no-unused-vars
98+
for (const _ of pullSync(fromSync('hello'), badTap)) { /* consume */ }
99+
}, { message: 'tapSync error' });
100+
}
101+
102+
// TapSync in a pullSync pipeline passes through data and flush
103+
function testTapSyncInPipeline() {
104+
const seen = [];
105+
let sawFlush = false;
106+
const observer = tapSync((chunks) => {
107+
if (chunks === null) {
108+
sawFlush = true;
109+
} else {
110+
for (const chunk of chunks) {
111+
seen.push(new TextDecoder().decode(chunk));
112+
}
113+
}
114+
});
115+
116+
const data = textSync(pullSync(fromSync('hello'), observer));
117+
assert.strictEqual(data, 'hello');
118+
assert.strictEqual(seen.length, 1);
119+
assert.strictEqual(seen[0], 'hello');
120+
assert.strictEqual(sawFlush, true);
121+
}
122+
80123
Promise.all([
81124
testTapSync(),
82125
testTapAsync(),
83126
testTapInPipeline(),
127+
testTapAsyncErrorPropagation(),
128+
testTapSyncErrorPropagation(),
129+
testTapSyncInPipeline(),
84130
]).then(common.mustCall());

0 commit comments

Comments
 (0)