Skip to content

Commit 17679f8

Browse files
committed
stream: improve Web Compression spec compliance
Pass rejectGarbageAfterEnd: true to createInflateRaw() and createBrotliDecompress(), matching the behavior already in place for deflate and gzip. The Compression Streams spec treats any data following a valid compressed payload as an error. When the underlying Node.js stream throws synchronously from write() (e.g. zlib rejects an invalid chunk type), destroy the stream so that the readable side is also errored. Without this, the readable side hangs forever waiting for data that will never arrive. Unskip WPT compression bad-chunks tests which now run instead of hang. Mark the remaining expected failures due to Node.js accepting SharedArrayBuffer-backed TypedArrays.
1 parent 41d5b41 commit 17679f8

6 files changed

Lines changed: 222 additions & 10 deletions

File tree

lib/internal/webstreams/adapters.js

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,22 @@ function newWritableStreamFromStreamWritable(streamWritable) {
220220
if (!streamWritable.writableObjectMode && isArrayBuffer(chunk)) {
221221
chunk = new Uint8Array(chunk);
222222
}
223-
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
224-
backpressurePromise = PromiseWithResolvers();
225-
return SafePromisePrototypeFinally(
226-
backpressurePromise.promise, () => {
227-
backpressurePromise = undefined;
228-
});
223+
// If the underlying Node.js stream throws synchronously from
224+
// write() (e.g. zlib rejects an invalid chunk type), we must
225+
// destroy the stream so that the readable side is also errored.
226+
// Without this, the readable side would hang forever waiting
227+
// for data that will never arrive.
228+
try {
229+
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
230+
backpressurePromise = PromiseWithResolvers();
231+
return SafePromisePrototypeFinally(
232+
backpressurePromise.promise, () => {
233+
backpressurePromise = undefined;
234+
});
235+
}
236+
} catch (error) {
237+
destroy(streamWritable, error);
238+
throw error;
229239
}
230240
},
231241

lib/internal/webstreams/compression.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,19 @@ class DecompressionStream {
108108
});
109109
break;
110110
case 'deflate-raw':
111-
this.#handle = lazyZlib().createInflateRaw();
111+
this.#handle = lazyZlib().createInflateRaw({
112+
rejectGarbageAfterEnd: true,
113+
});
112114
break;
113115
case 'gzip':
114116
this.#handle = lazyZlib().createGunzip({
115117
rejectGarbageAfterEnd: true,
116118
});
117119
break;
118120
case 'brotli':
119-
this.#handle = lazyZlib().createBrotliDecompress();
121+
this.#handle = lazyZlib().createBrotliDecompress({
122+
rejectGarbageAfterEnd: true,
123+
});
120124
break;
121125
}
122126
this.#transform = newReadableWritablePairFromDuplex(this.#handle);
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
'use strict';
2+
// Flags: --no-warnings --expose-internals
3+
require('../common');
4+
const assert = require('assert');
5+
const test = require('node:test');
6+
const { Writable } = require('stream');
7+
const {
8+
newWritableStreamFromStreamWritable,
9+
} = require('internal/webstreams/adapters');
10+
11+
// Verify that when the underlying Node.js stream throws synchronously from
12+
// write(), the writable web stream properly rejects and the underlying
13+
// stream is destroyed.
14+
15+
test('WritableStream from Node.js stream handles sync write throw', async () => {
16+
const error = new TypeError('invalid chunk');
17+
const writable = new Writable({
18+
write() {
19+
throw error;
20+
},
21+
});
22+
23+
const ws = newWritableStreamFromStreamWritable(writable);
24+
const writer = ws.getWriter();
25+
26+
await assert.rejects(writer.write('bad'), (err) => {
27+
assert.strictEqual(err, error);
28+
return true;
29+
});
30+
31+
// The underlying stream must be destroyed
32+
assert.strictEqual(writable.destroyed, true);
33+
});
34+
35+
test('WritableStream from Node.js stream - valid writes still work', async () => {
36+
const chunks = [];
37+
const writable = new Writable({
38+
write(chunk, _encoding, cb) {
39+
chunks.push(chunk);
40+
cb();
41+
},
42+
});
43+
44+
const ws = newWritableStreamFromStreamWritable(writable);
45+
const writer = ws.getWriter();
46+
47+
await writer.write(Buffer.from('hello'));
48+
await writer.write(Buffer.from(' world'));
49+
await writer.close();
50+
51+
assert.strictEqual(Buffer.concat(chunks).toString(), 'hello world');
52+
});
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
'use strict';
2+
require('../common');
3+
const assert = require('assert');
4+
const test = require('node:test');
5+
const { CompressionStream, DecompressionStream } = require('stream/web');
6+
7+
// Verify that writing invalid (non-BufferSource) chunks to
8+
// CompressionStream and DecompressionStream properly rejects
9+
// on both the write and the read side, instead of hanging.
10+
11+
const badChunks = [
12+
{ name: 'undefined', value: undefined },
13+
{ name: 'null', value: null },
14+
{ name: 'number', value: 3.14 },
15+
{ name: 'object', value: {} },
16+
{ name: 'array', value: [65] },
17+
];
18+
19+
for (const format of ['deflate', 'deflate-raw', 'gzip', 'brotli']) {
20+
for (const { name, value } of badChunks) {
21+
test(`CompressionStream rejects bad chunk (${name}) for ${format}`, async () => {
22+
const cs = new CompressionStream(format);
23+
const writer = cs.writable.getWriter();
24+
const reader = cs.readable.getReader();
25+
26+
const writePromise = writer.write(value);
27+
const readPromise = reader.read();
28+
29+
await assert.rejects(writePromise, { name: 'TypeError' });
30+
await assert.rejects(readPromise, { name: 'TypeError' });
31+
});
32+
33+
test(`DecompressionStream rejects bad chunk (${name}) for ${format}`, async () => {
34+
const ds = new DecompressionStream(format);
35+
const writer = ds.writable.getWriter();
36+
const reader = ds.readable.getReader();
37+
38+
const writePromise = writer.write(value);
39+
const readPromise = reader.read();
40+
41+
await assert.rejects(writePromise, { name: 'TypeError' });
42+
await assert.rejects(readPromise, { name: 'TypeError' });
43+
});
44+
}
45+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
'use strict';
2+
require('../common');
3+
const assert = require('assert');
4+
const test = require('node:test');
5+
const { CompressionStream, DecompressionStream } = require('stream/web');
6+
7+
// Verify that DecompressionStream rejects trailing data after a valid
8+
// compressed payload for all four supported formats (deflate, deflate-raw, gzip, brotli).
9+
10+
const input = Buffer.from('hello');
11+
const trailingJunk = Buffer.from([0xDE, 0xAD]);
12+
13+
async function compress(format, data) {
14+
const cs = new CompressionStream(format);
15+
const writer = cs.writable.getWriter();
16+
writer.write(data);
17+
writer.close();
18+
const chunks = await Array.fromAsync(cs.readable);
19+
return Buffer.concat(chunks.map((c) => Buffer.from(c)));
20+
}
21+
22+
for (const format of ['deflate', 'deflate-raw', 'gzip', 'brotli']) {
23+
test(`DecompressionStream rejects trailing junk for ${format}`, async () => {
24+
const compressed = await compress(format, input);
25+
const withJunk = Buffer.concat([compressed, trailingJunk]);
26+
27+
const ds = new DecompressionStream(format);
28+
const writer = ds.writable.getWriter();
29+
const reader = ds.readable.getReader();
30+
31+
writer.write(withJunk).catch(() => {});
32+
writer.close().catch(() => {});
33+
34+
await assert.rejects(async () => {
35+
const chunks = [];
36+
while (true) {
37+
const { done, value } = await reader.read();
38+
if (done) break;
39+
chunks.push(value);
40+
}
41+
}, (err) => {
42+
assert(err instanceof Error, `Expected Error, got ${err?.constructor?.name}`);
43+
return true;
44+
});
45+
});
46+
47+
test(`DecompressionStream accepts valid ${format} data without trailing junk`, async () => {
48+
const compressed = await compress(format, input);
49+
50+
const ds = new DecompressionStream(format);
51+
const writer = ds.writable.getWriter();
52+
53+
writer.write(compressed);
54+
writer.close();
55+
56+
const chunks = await Array.fromAsync(ds.readable);
57+
const result = Buffer.concat(chunks.map((c) => Buffer.from(c)));
58+
assert.strictEqual(result.toString(), 'hello');
59+
});
60+
}
61+
62+
// Extra: Verify that trailing data is also rejected when passed as a separate
63+
// chunk after the valid compressed data has been fully written.
64+
for (const format of ['deflate', 'deflate-raw', 'gzip', 'brotli']) {
65+
test(`DecompressionStream rejects trailing junk as separate chunk for ${format}`, async () => {
66+
const compressed = await compress(format, input);
67+
68+
const ds = new DecompressionStream(format);
69+
const writer = ds.writable.getWriter();
70+
const reader = ds.readable.getReader();
71+
72+
writer.write(compressed).catch(() => {});
73+
writer.write(trailingJunk).catch(() => {});
74+
writer.close().catch(() => {});
75+
76+
await assert.rejects(async () => {
77+
while (true) {
78+
const { done } = await reader.read();
79+
if (done) break;
80+
}
81+
}, (err) => {
82+
assert(err instanceof Error, `Expected Error, got ${err?.constructor?.name}`);
83+
return true;
84+
});
85+
});
86+
}

test/wpt/status/compression.json

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,24 @@
11
{
22
"compression-bad-chunks.any.js": {
3-
"skip": "Execution \"hangs\", ArrayBuffer and TypedArray is not accepted and throws, instead of rejects during writer.write"
3+
"fail": {
4+
"note": "Node.js accepts SharedArrayBuffer-backed TypedArrays",
5+
"expected": [
6+
"chunk of type shared Uint8Array should error the stream for deflate",
7+
"chunk of type shared Uint8Array should error the stream for deflate-raw",
8+
"chunk of type shared Uint8Array should error the stream for gzip",
9+
"chunk of type shared Uint8Array should error the stream for brotli"
10+
]
11+
}
412
},
513
"decompression-bad-chunks.any.js": {
6-
"skip": "Execution \"hangs\", ArrayBuffer and TypedArray is not accepted and throws, instead of rejects during writer.write"
14+
"fail": {
15+
"note": "Node.js accepts SharedArrayBuffer-backed TypedArrays and rejects Error instead of TypeError",
16+
"expected": [
17+
"chunk of type shared Uint8Array should error the stream for brotli",
18+
"chunk of type invalid deflate bytes should error the stream for brotli",
19+
"chunk of type invalid gzip bytes should error the stream for brotli"
20+
]
21+
}
722
},
823
"compression-with-detach.window.js": {
924
"requires": ["crypto"]

0 commit comments

Comments
 (0)