Skip to content

Commit 52280cd

Browse files
committed
fixup! stream: implement toStreamIterWriter/fromStreamIterWriter
1 parent fb49804 commit 52280cd

15 files changed

+1293
-1130
lines changed

doc/api/stream.md

Lines changed: 1 addition & 245 deletions
Original file line numberDiff line numberDiff line change
@@ -979,94 +979,6 @@ added: v12.3.0
979979

980980
Getter for the property `objectMode` of a given `Writable` stream.
981981

982-
##### `writable.toStreamIterWriter([options])`
983-
984-
<!-- YAML
985-
added: REPLACEME
986-
-->
987-
988-
> Stability: 1 - Experimental
989-
990-
* `options` {Object}
991-
* `backpressure` {string} Backpressure policy. One of `'strict'` (default),
992-
`'block'`, or `'drop-newest'`. See below for details.
993-
* Returns: {Object} A [`stream/iter` Writer][stream-iter-writer] adapter.
994-
995-
When the `--experimental-stream-iter` flag is enabled, returns an adapter
996-
object that conforms to the [`stream/iter`][] Writer interface, allowing the
997-
`Writable` to be used as a destination in the iterable streams API.
998-
999-
Since all writes on a classic `stream.Writable` are fundamentally
1000-
asynchronous, the synchronous methods (`writeSync`, `writevSync`, `endSync`)
1001-
always return `false` or `-1`, deferring to the async path. The per-write
1002-
`options.signal` parameter from the Writer interface is also ignored; classic
1003-
`stream.Writable` has no per-write abort signal support, so cancellation
1004-
should be handled at the pipeline level.
1005-
1006-
**Backpressure policies:**
1007-
1008-
* `'strict'` (default) — writes are rejected with `ERR_INVALID_STATE` when
1009-
the buffer is full (`writableLength >= writableHighWaterMark`). This catches
1010-
callers that ignore backpressure.
1011-
* `'block'` — writes wait for the `'drain'` event when the buffer is full.
1012-
This matches classic `stream.Writable` behavior and is the recommended
1013-
policy when using [`pipeTo()`][stream-iter-pipeto].
1014-
* `'drop-newest'` — writes are silently discarded when the buffer is full.
1015-
The data is not written to the underlying resource, but `writer.end()`
1016-
still reports the total bytes (including dropped bytes) for consistency
1017-
with the Writer spec.
1018-
* `'drop-oldest'`**not supported**. Classic `stream.Writable` does not
1019-
provide an API to evict already-buffered data without risking partial
1020-
eviction of atomic `writev()` batches. Passing this value throws
1021-
`ERR_INVALID_ARG_VALUE`.
1022-
1023-
The adapter maps:
1024-
1025-
* `writer.write(chunk)` — calls `writable.write(chunk)`, subject to the
1026-
backpressure policy.
1027-
* `writer.writev(chunks)` — corks the writable, writes all chunks, then
1028-
uncorks. Subject to the backpressure policy.
1029-
* `writer.end()` — calls `writable.end()` and resolves with total bytes
1030-
written when the `'finish'` event fires.
1031-
* `writer.fail(reason)` — calls `writable.destroy(reason)`.
1032-
* `writer.desiredSize` — returns the available buffer space
1033-
(`writableHighWaterMark - writableLength`), or `null` if the stream
1034-
is destroyed or finished.
1035-
1036-
```mjs
1037-
import { Writable } from 'node:stream';
1038-
import { from, pipeTo } from 'node:stream/iter';
1039-
1040-
const chunks = [];
1041-
const writable = new Writable({
1042-
write(chunk, encoding, cb) { chunks.push(chunk); cb(); },
1043-
});
1044-
1045-
// Use 'block' policy with pipeTo for classic backpressure behavior
1046-
await pipeTo(from('hello world'),
1047-
writable.toStreamIterWriter({ backpressure: 'block' }));
1048-
```
1049-
1050-
```cjs
1051-
const { Writable } = require('node:stream');
1052-
const { from, pipeTo } = require('node:stream/iter');
1053-
1054-
async function run() {
1055-
const chunks = [];
1056-
const writable = new Writable({
1057-
write(chunk, encoding, cb) { chunks.push(chunk); cb(); },
1058-
});
1059-
1060-
await pipeTo(from('hello world'),
1061-
writable.toStreamIterWriter({ backpressure: 'block' }));
1062-
}
1063-
1064-
run().catch(console.error);
1065-
```
1066-
1067-
Without the `--experimental-stream-iter` flag, calling this method throws
1068-
[`ERR_STREAM_ITER_MISSING_FLAG`][].
1069-
1070982
##### `writable[Symbol.asyncDispose]()`
1071983

1072984
<!-- YAML
@@ -2108,7 +2020,7 @@ are yielded directly as `Buffer` instances (which are `Uint8Array` subclasses).
21082020
For object-mode or encoded streams, each chunk is normalized to `Uint8Array`
21092021
before batching.
21102022

2111-
The returned iterator is tagged as a trusted source, so [`from()`][stream-iter-from]
2023+
The returned iterator is tagged as a validated source, so [`from()`][stream-iter-from]
21122024
passes it through without additional normalization.
21132025

21142026
```mjs
@@ -3295,101 +3207,6 @@ Readable.from([
32953207
]);
32963208
```
32973209

3298-
### `stream.Readable.fromStreamIter(source[, options])`
3299-
3300-
<!-- YAML
3301-
added: REPLACEME
3302-
-->
3303-
3304-
> Stability: 1 - Experimental
3305-
3306-
* `source` {AsyncIterable} An `AsyncIterable<Uint8Array[]>` source, such as
3307-
the return value of [`pull()`][] or [`from()`][stream-iter-from].
3308-
* `options` {Object}
3309-
* `highWaterMark` {number} The internal buffer size in bytes before
3310-
backpressure is applied. **Default:** `65536` (64 KB).
3311-
* `signal` {AbortSignal} An optional signal that can be used to abort
3312-
the readable, destroying the stream and cleaning up the source iterator.
3313-
* Returns: {stream.Readable}
3314-
3315-
Creates a byte-mode {stream.Readable} from an `AsyncIterable<Uint8Array[]>`
3316-
(the native batch format used by the [`stream/iter`][] API). Each
3317-
`Uint8Array` in a yielded batch is pushed as a separate chunk into the
3318-
Readable.
3319-
3320-
This method requires the `--experimental-stream-iter` CLI flag.
3321-
3322-
```mjs
3323-
import { Readable } from 'node:stream';
3324-
import { createWriteStream } from 'node:fs';
3325-
import { from, pull } from 'node:stream/iter';
3326-
import { compressGzip } from 'node:zlib/iter';
3327-
3328-
// Bridge a stream/iter pipeline to a classic Readable
3329-
const source = pull(from('hello world'), compressGzip());
3330-
const readable = Readable.fromStreamIter(source);
3331-
3332-
readable.pipe(createWriteStream('output.gz'));
3333-
```
3334-
3335-
```cjs
3336-
const { Readable } = require('node:stream');
3337-
const { createWriteStream } = require('node:fs');
3338-
const { from, pull } = require('node:stream/iter');
3339-
const { compressGzip } = require('node:zlib/iter');
3340-
3341-
const source = pull(from('hello world'), compressGzip());
3342-
const readable = Readable.fromStreamIter(source);
3343-
3344-
readable.pipe(createWriteStream('output.gz'));
3345-
```
3346-
3347-
### `stream.Readable.fromStreamIterSync(source[, options])`
3348-
3349-
<!-- YAML
3350-
added: REPLACEME
3351-
-->
3352-
3353-
> Stability: 1 - Experimental
3354-
3355-
* `source` {Iterable} An `Iterable<Uint8Array[]>` source, such as the
3356-
return value of [`pullSync()`][] or [`fromSync()`][].
3357-
* `options` {Object}
3358-
* `highWaterMark` {number} The internal buffer size in bytes before
3359-
backpressure is applied. **Default:** `65536` (64 KB).
3360-
* Returns: {stream.Readable}
3361-
3362-
Creates a byte-mode {stream.Readable} from a synchronous
3363-
`Iterable<Uint8Array[]>` (the native batch format used by the
3364-
[`stream/iter`][] sync API). Each `Uint8Array` in a yielded batch is
3365-
pushed as a separate chunk into the Readable.
3366-
3367-
The `_read()` method pulls from the iterator synchronously, so data is
3368-
available immediately via `readable.read()` without waiting for async
3369-
callbacks.
3370-
3371-
This method requires the `--experimental-stream-iter` CLI flag.
3372-
3373-
```mjs
3374-
import { Readable } from 'node:stream';
3375-
import { fromSync } from 'node:stream/iter';
3376-
3377-
const source = fromSync('hello world');
3378-
const readable = Readable.fromStreamIterSync(source);
3379-
3380-
console.log(readable.read().toString()); // 'hello world'
3381-
```
3382-
3383-
```cjs
3384-
const { Readable } = require('node:stream');
3385-
const { fromSync } = require('node:stream/iter');
3386-
3387-
const source = fromSync('hello world');
3388-
const readable = Readable.fromStreamIterSync(source);
3389-
3390-
console.log(readable.read().toString()); // 'hello world'
3391-
```
3392-
33933210
### `stream.Readable.fromWeb(readableStream[, options])`
33943211

33953212
<!-- YAML
@@ -3463,62 +3280,6 @@ changes:
34633280
`'bytes'` or undefined.
34643281
* Returns: {ReadableStream}
34653282

3466-
### `stream.Writable.fromStreamIter(writer)`
3467-
3468-
<!-- YAML
3469-
added: REPLACEME
3470-
-->
3471-
3472-
> Stability: 1 - Experimental
3473-
3474-
* `writer` {Object} A [`stream/iter`][] Writer. Only the `write()` method is
3475-
required; `end()`, `fail()`, `writeSync()`, `writevSync()`, `endSync()`,
3476-
and `writev()` are optional.
3477-
* Returns: {stream.Writable}
3478-
3479-
When the `--experimental-stream-iter` flag is enabled, creates a classic
3480-
`stream.Writable` backed by a [`stream/iter` Writer][stream-iter-writer].
3481-
3482-
Each `_write()` / `_writev()` call attempts the Writer's synchronous method
3483-
first (`writeSync` / `writevSync`), falling back to the async method if the
3484-
sync path returns `false`. Similarly, `_final()` tries `endSync()` before
3485-
`end()`. When the sync path succeeds, the callback is deferred via
3486-
`queueMicrotask` to preserve the async resolution contract that Writable
3487-
internals expect.
3488-
3489-
* `_write(chunk, encoding, cb)` — tries `writer.writeSync(bytes)`, falls
3490-
back to `await writer.write(bytes)`.
3491-
* `_writev(entries, cb)` — tries `writer.writevSync(chunks)`, falls
3492-
back to `await writer.writev(chunks)`. Only defined if `writer.writev`
3493-
exists.
3494-
* `_final(cb)` — tries `writer.endSync()`, falls back to
3495-
`await writer.end()`.
3496-
* `_destroy(err, cb)` — calls `writer.fail(err)`.
3497-
3498-
```mjs
3499-
import { Writable } from 'node:stream';
3500-
import { push, from, pipeTo } from 'node:stream/iter';
3501-
3502-
const { writer, readable } = push();
3503-
const writable = Writable.fromStreamIter(writer);
3504-
3505-
writable.write('hello');
3506-
writable.end();
3507-
```
3508-
3509-
```cjs
3510-
const { Writable } = require('node:stream');
3511-
const { push, from, pipeTo } = require('node:stream/iter');
3512-
3513-
const { writer, readable } = push();
3514-
const writable = Writable.fromStreamIter(writer);
3515-
3516-
writable.write('hello');
3517-
writable.end();
3518-
```
3519-
3520-
This method requires the `--experimental-stream-iter` CLI flag.
3521-
35223283
### `stream.Writable.fromWeb(writableStream[, options])`
35233284

35243285
<!-- YAML
@@ -5298,15 +5059,12 @@ contain multi-byte characters.
52985059
[`Symbol.hasInstance`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/hasInstance
52995060
[`Transform`]: #class-streamtransform
53005061
[`Writable`]: #class-streamwritable
5301-
[`fromSync()`]: stream_iter.md#fromsyncinput
53025062
[`fs.createReadStream()`]: fs.md#fscreatereadstreampath-options
53035063
[`fs.createWriteStream()`]: fs.md#fscreatewritestreampath-options
53045064
[`net.Socket`]: net.md#class-netsocket
53055065
[`process.stderr`]: process.md#processstderr
53065066
[`process.stdin`]: process.md#processstdin
53075067
[`process.stdout`]: process.md#processstdout
5308-
[`pull()`]: stream_iter.md#pullsource-transforms-options
5309-
[`pullSync()`]: stream_iter.md#pullsyncsource-transforms-options
53105068
[`readable._read()`]: #readable_readsize
53115069
[`readable.compose(stream)`]: #readablecomposestream-options
53125070
[`readable.map`]: #readablemapfn-options
@@ -5353,8 +5111,6 @@ contain multi-byte characters.
53535111
[stream-finished]: #streamfinishedstream-options-callback
53545112
[stream-finished-promise]: #streamfinishedstream-options
53555113
[stream-iter-from]: stream_iter.md#frominput
5356-
[stream-iter-pipeto]: stream_iter.md#pipetosource-transforms-writer
5357-
[stream-iter-writer]: stream_iter.md#writer-interface
53585114
[stream-pause]: #readablepause
53595115
[stream-pipeline]: #streampipelinesource-transforms-destination-callback
53605116
[stream-pipeline-promise]: #streampipelinesource-transforms-destination-options

0 commit comments

Comments
 (0)