Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -2882,6 +2882,13 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream.
A stream method was called that cannot complete because the stream was
destroyed using `stream.destroy()`.

<a id="ERR_STREAM_ITER_MISSING_FLAG"></a>

### `ERR_STREAM_ITER_MISSING_FLAG`

A stream/iter API was used without the `--experimental-stream-iter` CLI flag
enabled.

<a id="ERR_STREAM_NULL_VALUES"></a>

### `ERR_STREAM_NULL_VALUES`
Expand Down
59 changes: 59 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1998,6 +1998,61 @@ option. In the code example above, data will be in a single chunk if the file
has less then 64 KiB of data because no `highWaterMark` option is provided to
[`fs.createReadStream()`][].

##### `readable[Symbol.for('Stream.toAsyncStreamable')]()`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* Returns: {AsyncIterable} An `AsyncIterable<Uint8Array[]>` that yields
batched chunks from the stream.

When the `--experimental-stream-iter` flag is enabled, `Readable` streams
implement the [`Stream.toAsyncStreamable`][] protocol, enabling efficient
consumption by the [`stream/iter`][] API.

This provides a batched async iterator that drains the stream's internal
buffer into `Uint8Array[]` batches, amortizing the per-chunk Promise overhead
of the standard `Symbol.asyncIterator` path. For byte-mode streams, chunks
are yielded directly as `Buffer` instances (which are `Uint8Array` subclasses).
For object-mode or encoded streams, each chunk is normalized to `Uint8Array`
before batching.

The returned iterator is tagged as a validated source, so [`from()`][stream-iter-from]
passes it through without additional normalization.

```mjs
import { Readable } from 'node:stream';
import { text, from } from 'node:stream/iter';

const readable = new Readable({
read() { this.push('hello'); this.push(null); },
});

// Readable is automatically consumed via toAsyncStreamable
console.log(await text(from(readable))); // 'hello'
```

```cjs
const { Readable } = require('node:stream');
const { text, from } = require('node:stream/iter');

async function run() {
const readable = new Readable({
read() { this.push('hello'); this.push(null); },
});

console.log(await text(from(readable))); // 'hello'
}

run().catch(console.error);
```

Without the `--experimental-stream-iter` flag, calling this method throws
[`ERR_STREAM_ITER_MISSING_FLAG`][].

##### `readable[Symbol.asyncDispose]()`

<!-- YAML
Expand Down Expand Up @@ -4997,8 +5052,10 @@ contain multi-byte characters.
[`'finish'`]: #event-finish
[`'readable'`]: #event-readable
[`Duplex`]: #class-streamduplex
[`ERR_STREAM_ITER_MISSING_FLAG`]: errors.md#err_stream_iter_missing_flag
[`EventEmitter`]: events.md#class-eventemitter
[`Readable`]: #class-streamreadable
[`Stream.toAsyncStreamable`]: stream_iter.md#streamtoasyncstreamable
[`Symbol.hasInstance`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/hasInstance
[`Transform`]: #class-streamtransform
[`Writable`]: #class-streamwritable
Expand All @@ -5024,6 +5081,7 @@ contain multi-byte characters.
[`stream.uncork()`]: #writableuncork
[`stream.unpipe()`]: #readableunpipedestination
[`stream.wrap()`]: #readablewrapstream
[`stream/iter`]: stream_iter.md
[`writable._final()`]: #writable_finalcallback
[`writable._write()`]: #writable_writechunk-encoding-callback
[`writable._writev()`]: #writable_writevchunks-callback
Expand Down Expand Up @@ -5052,6 +5110,7 @@ contain multi-byte characters.
[stream-end]: #writableendchunk-encoding-callback
[stream-finished]: #streamfinishedstream-options-callback
[stream-finished-promise]: #streamfinishedstream-options
[stream-iter-from]: stream_iter.md#frominput
[stream-pause]: #readablepause
[stream-pipeline]: #streampipelinesource-transforms-destination-callback
[stream-pipeline-promise]: #streampipelinesource-transforms-destination-options
Expand Down
257 changes: 257 additions & 0 deletions doc/api/stream_iter.md
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,258 @@ Compression and decompression transforms for use with `pull()`, `pullSync()`,
`pipeTo()`, and `pipeToSync()` are available via the [`node:zlib/iter`][]
module. See the [`node:zlib/iter` documentation][] for details.

## Classic stream interop

These utility functions bridge between classic
[`stream.Readable`][]/[`stream.Writable`][] streams and the `stream/iter`
API.

Both `fromReadable()` and `fromWritable()` accept duck-typed objects -- they
do not require the input to extend `stream.Readable` or `stream.Writable`
directly. The minimum contract is described below for each function.

### `fromReadable(readable)`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `readable` {stream.Readable|Object} A classic Readable stream or any object
with `read()` and `on()` methods.
* Returns: {AsyncIterable\<Uint8Array\[]>} A stream/iter async iterable source.

Converts a classic Readable stream (or duck-typed equivalent) into a
stream/iter async iterable source that can be passed to [`from()`][],
[`pull()`][], [`text()`][], etc.

If the object implements the [`toAsyncStreamable`][] protocol (as
`stream.Readable` does), that protocol is used. Otherwise, the function
duck-types on `read()` and `on()` (EventEmitter) and wraps the stream with
a batched async iterator.

The result is cached per instance -- calling `fromReadable()` twice with the
same stream returns the same iterable.

For object-mode or encoded Readable streams, chunks are automatically
normalized to `Uint8Array`.

```mjs
import { Readable } from 'node:stream';
import { fromReadable, text } from 'node:stream/iter';

const readable = new Readable({
read() { this.push('hello world'); this.push(null); },
});

const result = await text(fromReadable(readable));
console.log(result); // 'hello world'
```

```cjs
const { Readable } = require('node:stream');
const { fromReadable, text } = require('node:stream/iter');

const readable = new Readable({
read() { this.push('hello world'); this.push(null); },
});

async function run() {
const result = await text(fromReadable(readable));
console.log(result); // 'hello world'
}
run();
```

### `fromWritable(writable[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `writable` {stream.Writable|Object} A classic Writable stream or any object
with `write()` and `on()` methods.
* `options` {Object}
* `backpressure` {string} Backpressure policy. **Default:** `'strict'`.
* `'strict'` -- writes are rejected when the buffer is full. Catches
callers that ignore backpressure.
* `'block'` -- writes wait for drain when the buffer is full. Recommended
for use with [`pipeTo()`][].
* `'drop-newest'` -- writes are silently discarded when the buffer is full.
* `'drop-oldest'` -- **not supported**. Throws `ERR_INVALID_ARG_VALUE`.
* Returns: {Object} A stream/iter Writer adapter.

Creates a stream/iter Writer adapter from a classic Writable stream (or
duck-typed equivalent). The adapter can be passed to [`pipeTo()`][] as a
destination.

Since all writes on a classic Writable are fundamentally asynchronous,
the synchronous Writer methods (`writeSync`, `writevSync`, `endSync`) always
return `false` or `-1`, deferring to the async path. The per-write
`options.signal` parameter from the Writer interface is also ignored.

The result is cached per instance -- calling `fromWritable()` twice with the
same stream returns the same Writer.

For duck-typed streams that do not expose `writableHighWaterMark`,
`writableLength`, or similar properties, sensible defaults are used.
Object-mode writables (if detectable) are rejected since the Writer
interface is bytes-only.

```mjs
import { Writable } from 'node:stream';
import { from, fromWritable, pipeTo } from 'node:stream/iter';

const writable = new Writable({
write(chunk, encoding, cb) { console.log(chunk.toString()); cb(); },
});

await pipeTo(from('hello world'),
fromWritable(writable, { backpressure: 'block' }));
```

```cjs
const { Writable } = require('node:stream');
const { from, fromWritable, pipeTo } = require('node:stream/iter');

async function run() {
const writable = new Writable({
write(chunk, encoding, cb) { console.log(chunk.toString()); cb(); },
});

await pipeTo(from('hello world'),
fromWritable(writable, { backpressure: 'block' }));
}
run();
```

### `toReadable(source[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `source` {AsyncIterable} An `AsyncIterable<Uint8Array[]>` source, such as
the return value of [`pull()`][] or [`from()`][].
* `options` {Object}
* `highWaterMark` {number} The internal buffer size in bytes before
backpressure is applied. **Default:** `65536` (64 KB).
* `signal` {AbortSignal} An optional signal to abort the readable.
* Returns: {stream.Readable}

Creates a byte-mode [`stream.Readable`][] from an `AsyncIterable<Uint8Array[]>`
(the native batch format used by the stream/iter API). Each `Uint8Array` in a
yielded batch is pushed as a separate chunk into the Readable.

```mjs
import { createWriteStream } from 'node:fs';
import { from, pull, toReadable } from 'node:stream/iter';
import { compressGzip } from 'node:zlib/iter';

const source = pull(from('hello world'), compressGzip());
const readable = toReadable(source);

readable.pipe(createWriteStream('output.gz'));
```

```cjs
const { createWriteStream } = require('node:fs');
const { from, pull, toReadable } = require('node:stream/iter');
const { compressGzip } = require('node:zlib/iter');

const source = pull(from('hello world'), compressGzip());
const readable = toReadable(source);

readable.pipe(createWriteStream('output.gz'));
```

### `toReadableSync(source[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `source` {Iterable} An `Iterable<Uint8Array[]>` source, such as the
return value of [`pullSync()`][] or [`fromSync()`][].
* `options` {Object}
* `highWaterMark` {number} The internal buffer size in bytes before
backpressure is applied. **Default:** `65536` (64 KB).
* Returns: {stream.Readable}

Creates a byte-mode [`stream.Readable`][] from a synchronous
`Iterable<Uint8Array[]>`. The `_read()` method pulls from the iterator
synchronously, so data is available immediately via `readable.read()`.

```mjs
import { fromSync, toReadableSync } from 'node:stream/iter';

const source = fromSync('hello world');
const readable = toReadableSync(source);

console.log(readable.read().toString()); // 'hello world'
```

```cjs
const { fromSync, toReadableSync } = require('node:stream/iter');

const source = fromSync('hello world');
const readable = toReadableSync(source);

console.log(readable.read().toString()); // 'hello world'
```

### `toWritable(writer)`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `writer` {Object} A stream/iter Writer. Only the `write()` method is
required; `end()`, `fail()`, `writeSync()`, `writevSync()`, `endSync()`,
and `writev()` are optional.
* Returns: {stream.Writable}

Creates a classic [`stream.Writable`][] backed by a stream/iter Writer.

Each `_write()` / `_writev()` call attempts the Writer's synchronous method
first (`writeSync` / `writevSync`), falling back to the async method if the
sync path returns `false` or throws. Similarly, `_final()` tries `endSync()`
before `end()`. When the sync path succeeds, the callback is deferred via
`queueMicrotask` to preserve the async resolution contract.

The Writable's `highWaterMark` is set to `Number.MAX_SAFE_INTEGER` to
effectively disable its internal buffering, allowing the underlying Writer
to manage backpressure directly.

```mjs
import { push, toWritable } from 'node:stream/iter';

const { writer, readable } = push();
const writable = toWritable(writer);

writable.write('hello');
writable.end();
```

```cjs
const { push, toWritable } = require('node:stream/iter');

const { writer, readable } = push();
const writable = toWritable(writer);

writable.write('hello');
writable.end();
```

## Protocol symbols

These well-known symbols allow third-party objects to participate in the
Expand Down Expand Up @@ -1816,10 +2068,15 @@ console.log(textSync(stream)); // 'hello world'
[`arrayBuffer()`]: #arraybuffersource-options
[`bytes()`]: #bytessource-options
[`from()`]: #frominput
[`fromSync()`]: #fromsyncinput
[`node:zlib/iter`]: zlib_iter.md
[`node:zlib/iter` documentation]: zlib_iter.md
[`pipeTo()`]: #pipetosource-transforms-writer-options
[`pull()`]: #pullsource-transforms-options
[`pullSync()`]: #pullsyncsource-transforms-options
[`share()`]: #sharesource-options
[`stream.Readable`]: stream.md#class-streamreadable
[`stream.Writable`]: stream.md#class-streamwritable
[`tap()`]: #tapcallback
[`text()`]: #textsource-options
[`toAsyncStreamable`]: #streamtoasyncstreamable
2 changes: 2 additions & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1770,6 +1770,8 @@ E('ERR_STREAM_ALREADY_FINISHED',
Error);
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
E('ERR_STREAM_ITER_MISSING_FLAG',
'The stream/iter API requires the --experimental-stream-iter flag', TypeError);
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
Expand Down
Loading
Loading