diff --git a/doc/api/errors.md b/doc/api/errors.md
index 65ef2ce7bf5d01..98073d49d62098 100644
--- a/doc/api/errors.md
+++ b/doc/api/errors.md
@@ -2882,6 +2882,13 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream.
A stream method was called that cannot complete because the stream was
destroyed using `stream.destroy()`.
+
+
+### `ERR_STREAM_ITER_MISSING_FLAG`
+
+A stream/iter API was used without the `--experimental-stream-iter` CLI flag
+enabled.
+
### `ERR_STREAM_NULL_VALUES`
diff --git a/doc/api/stream.md b/doc/api/stream.md
index 65afeaad6306e0..20892fb6cb0a87 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -1998,6 +1998,61 @@ option. In the code example above, data will be in a single chunk if the file
has less then 64 KiB of data because no `highWaterMark` option is provided to
[`fs.createReadStream()`][].
+##### `readable[Symbol.for('Stream.toAsyncStreamable')]()`
+
+
+
+> Stability: 1 - Experimental
+
+* Returns: {AsyncIterable} An `AsyncIterable` that yields
+ batched chunks from the stream.
+
+When the `--experimental-stream-iter` flag is enabled, `Readable` streams
+implement the [`Stream.toAsyncStreamable`][] protocol, enabling efficient
+consumption by the [`stream/iter`][] API.
+
+This provides a batched async iterator that drains the stream's internal
+buffer into `Uint8Array[]` batches, amortizing the per-chunk Promise overhead
+of the standard `Symbol.asyncIterator` path. For byte-mode streams, chunks
+are yielded directly as `Buffer` instances (which are `Uint8Array` subclasses).
+For object-mode or encoded streams, each chunk is normalized to `Uint8Array`
+before batching.
+
+The returned iterator is tagged as a validated source, so [`from()`][stream-iter-from]
+passes it through without additional normalization.
+
+```mjs
+import { Readable } from 'node:stream';
+import { text, from } from 'node:stream/iter';
+
+const readable = new Readable({
+ read() { this.push('hello'); this.push(null); },
+});
+
+// Readable is automatically consumed via toAsyncStreamable
+console.log(await text(from(readable))); // 'hello'
+```
+
+```cjs
+const { Readable } = require('node:stream');
+const { text, from } = require('node:stream/iter');
+
+async function run() {
+ const readable = new Readable({
+ read() { this.push('hello'); this.push(null); },
+ });
+
+ console.log(await text(from(readable))); // 'hello'
+}
+
+run().catch(console.error);
+```
+
+Without the `--experimental-stream-iter` flag, calling this method throws
+[`ERR_STREAM_ITER_MISSING_FLAG`][].
+
##### `readable[Symbol.asyncDispose]()`
+
+> Stability: 1 - Experimental
+
+* `readable` {stream.Readable|Object} A classic Readable stream or any object
+ with `read()` and `on()` methods.
+* Returns: {AsyncIterable\} A stream/iter async iterable source.
+
+Converts a classic Readable stream (or duck-typed equivalent) into a
+stream/iter async iterable source that can be passed to [`from()`][],
+[`pull()`][], [`text()`][], etc.
+
+If the object implements the [`toAsyncStreamable`][] protocol (as
+`stream.Readable` does), that protocol is used. Otherwise, the function
+duck-types on `read()` and `on()` (EventEmitter) and wraps the stream with
+a batched async iterator.
+
+The result is cached per instance -- calling `fromReadable()` twice with the
+same stream returns the same iterable.
+
+For object-mode or encoded Readable streams, chunks are automatically
+normalized to `Uint8Array`.
+
+```mjs
+import { Readable } from 'node:stream';
+import { fromReadable, text } from 'node:stream/iter';
+
+const readable = new Readable({
+ read() { this.push('hello world'); this.push(null); },
+});
+
+const result = await text(fromReadable(readable));
+console.log(result); // 'hello world'
+```
+
+```cjs
+const { Readable } = require('node:stream');
+const { fromReadable, text } = require('node:stream/iter');
+
+const readable = new Readable({
+ read() { this.push('hello world'); this.push(null); },
+});
+
+async function run() {
+ const result = await text(fromReadable(readable));
+ console.log(result); // 'hello world'
+}
+run();
+```
+
+### `fromWritable(writable[, options])`
+
+
+
+> Stability: 1 - Experimental
+
+* `writable` {stream.Writable|Object} A classic Writable stream or any object
+ with `write()` and `on()` methods.
+* `options` {Object}
+ * `backpressure` {string} Backpressure policy. **Default:** `'strict'`.
+ * `'strict'` -- writes are rejected when the buffer is full. Catches
+ callers that ignore backpressure.
+ * `'block'` -- writes wait for drain when the buffer is full. Recommended
+ for use with [`pipeTo()`][].
+ * `'drop-newest'` -- writes are silently discarded when the buffer is full.
+ * `'drop-oldest'` -- **not supported**. Throws `ERR_INVALID_ARG_VALUE`.
+* Returns: {Object} A stream/iter Writer adapter.
+
+Creates a stream/iter Writer adapter from a classic Writable stream (or
+duck-typed equivalent). The adapter can be passed to [`pipeTo()`][] as a
+destination.
+
+Since all writes on a classic Writable are fundamentally asynchronous,
+the synchronous Writer methods (`writeSync`, `writevSync`, `endSync`) always
+return `false` or `-1`, deferring to the async path. The per-write
+`options.signal` parameter from the Writer interface is also ignored.
+
+The result is cached per instance -- calling `fromWritable()` twice with the
+same stream returns the same Writer.
+
+For duck-typed streams that do not expose `writableHighWaterMark`,
+`writableLength`, or similar properties, sensible defaults are used.
+Object-mode writables (if detectable) are rejected since the Writer
+interface is bytes-only.
+
+```mjs
+import { Writable } from 'node:stream';
+import { from, fromWritable, pipeTo } from 'node:stream/iter';
+
+const writable = new Writable({
+ write(chunk, encoding, cb) { console.log(chunk.toString()); cb(); },
+});
+
+await pipeTo(from('hello world'),
+ fromWritable(writable, { backpressure: 'block' }));
+```
+
+```cjs
+const { Writable } = require('node:stream');
+const { from, fromWritable, pipeTo } = require('node:stream/iter');
+
+async function run() {
+ const writable = new Writable({
+ write(chunk, encoding, cb) { console.log(chunk.toString()); cb(); },
+ });
+
+ await pipeTo(from('hello world'),
+ fromWritable(writable, { backpressure: 'block' }));
+}
+run();
+```
+
+### `toReadable(source[, options])`
+
+
+
+> Stability: 1 - Experimental
+
+* `source` {AsyncIterable} An `AsyncIterable` source, such as
+ the return value of [`pull()`][] or [`from()`][].
+* `options` {Object}
+ * `highWaterMark` {number} The internal buffer size in bytes before
+ backpressure is applied. **Default:** `65536` (64 KB).
+ * `signal` {AbortSignal} An optional signal to abort the readable.
+* Returns: {stream.Readable}
+
+Creates a byte-mode [`stream.Readable`][] from an `AsyncIterable`
+(the native batch format used by the stream/iter API). Each `Uint8Array` in a
+yielded batch is pushed as a separate chunk into the Readable.
+
+```mjs
+import { createWriteStream } from 'node:fs';
+import { from, pull, toReadable } from 'node:stream/iter';
+import { compressGzip } from 'node:zlib/iter';
+
+const source = pull(from('hello world'), compressGzip());
+const readable = toReadable(source);
+
+readable.pipe(createWriteStream('output.gz'));
+```
+
+```cjs
+const { createWriteStream } = require('node:fs');
+const { from, pull, toReadable } = require('node:stream/iter');
+const { compressGzip } = require('node:zlib/iter');
+
+const source = pull(from('hello world'), compressGzip());
+const readable = toReadable(source);
+
+readable.pipe(createWriteStream('output.gz'));
+```
+
+### `toReadableSync(source[, options])`
+
+
+
+> Stability: 1 - Experimental
+
+* `source` {Iterable} An `Iterable` source, such as the
+ return value of [`pullSync()`][] or [`fromSync()`][].
+* `options` {Object}
+ * `highWaterMark` {number} The internal buffer size in bytes before
+ backpressure is applied. **Default:** `65536` (64 KB).
+* Returns: {stream.Readable}
+
+Creates a byte-mode [`stream.Readable`][] from a synchronous
+`Iterable`. The `_read()` method pulls from the iterator
+synchronously, so data is available immediately via `readable.read()`.
+
+```mjs
+import { fromSync, toReadableSync } from 'node:stream/iter';
+
+const source = fromSync('hello world');
+const readable = toReadableSync(source);
+
+console.log(readable.read().toString()); // 'hello world'
+```
+
+```cjs
+const { fromSync, toReadableSync } = require('node:stream/iter');
+
+const source = fromSync('hello world');
+const readable = toReadableSync(source);
+
+console.log(readable.read().toString()); // 'hello world'
+```
+
+### `toWritable(writer)`
+
+
+
+> Stability: 1 - Experimental
+
+* `writer` {Object} A stream/iter Writer. Only the `write()` method is
+ required; `end()`, `fail()`, `writeSync()`, `writevSync()`, `endSync()`,
+ and `writev()` are optional.
+* Returns: {stream.Writable}
+
+Creates a classic [`stream.Writable`][] backed by a stream/iter Writer.
+
+Each `_write()` / `_writev()` call attempts the Writer's synchronous method
+first (`writeSync` / `writevSync`), falling back to the async method if the
+sync path returns `false` or throws. Similarly, `_final()` tries `endSync()`
+before `end()`. When the sync path succeeds, the callback is deferred via
+`queueMicrotask` to preserve the async resolution contract.
+
+The Writable's `highWaterMark` is set to `Number.MAX_SAFE_INTEGER` to
+effectively disable its internal buffering, allowing the underlying Writer
+to manage backpressure directly.
+
+```mjs
+import { push, toWritable } from 'node:stream/iter';
+
+const { writer, readable } = push();
+const writable = toWritable(writer);
+
+writable.write('hello');
+writable.end();
+```
+
+```cjs
+const { push, toWritable } = require('node:stream/iter');
+
+const { writer, readable } = push();
+const writable = toWritable(writer);
+
+writable.write('hello');
+writable.end();
+```
+
## Protocol symbols
These well-known symbols allow third-party objects to participate in the
@@ -1816,10 +2068,15 @@ console.log(textSync(stream)); // 'hello world'
[`arrayBuffer()`]: #arraybuffersource-options
[`bytes()`]: #bytessource-options
[`from()`]: #frominput
+[`fromSync()`]: #fromsyncinput
[`node:zlib/iter`]: zlib_iter.md
[`node:zlib/iter` documentation]: zlib_iter.md
[`pipeTo()`]: #pipetosource-transforms-writer-options
[`pull()`]: #pullsource-transforms-options
+[`pullSync()`]: #pullsyncsource-transforms-options
[`share()`]: #sharesource-options
+[`stream.Readable`]: stream.md#class-streamreadable
+[`stream.Writable`]: stream.md#class-streamwritable
[`tap()`]: #tapcallback
[`text()`]: #textsource-options
+[`toAsyncStreamable`]: #streamtoasyncstreamable
diff --git a/lib/internal/errors.js b/lib/internal/errors.js
index 7c4728627731fe..206e2a24716022 100644
--- a/lib/internal/errors.js
+++ b/lib/internal/errors.js
@@ -1770,6 +1770,8 @@ E('ERR_STREAM_ALREADY_FINISHED',
Error);
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
+E('ERR_STREAM_ITER_MISSING_FLAG',
+ 'The stream/iter API requires the --experimental-stream-iter flag', TypeError);
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
diff --git a/lib/internal/streams/iter/classic.js b/lib/internal/streams/iter/classic.js
new file mode 100644
index 00000000000000..17aa43f0969202
--- /dev/null
+++ b/lib/internal/streams/iter/classic.js
@@ -0,0 +1,844 @@
+'use strict';
+
+// Interop utilities between classic Node.js streams and the stream/iter API.
+//
+// These are Node.js-specific (not part of the stream/iter spec) and are
+// exported from 'stream/iter' as top-level utility functions:
+//
+// fromReadable(readable) -- classic Readable (or duck-type) -> stream/iter source
+// fromWritable(writable, opts) -- classic Writable (or duck-type) -> stream/iter Writer
+// toReadable(source, opts) -- stream/iter source -> classic Readable
+// toReadableSync(source, opts) -- stream/iter source (sync) -> classic Readable
+// toWritable(writer) -- stream/iter Writer -> classic Writable
+
+const {
+ ArrayIsArray,
+ MathMax,
+ NumberMAX_SAFE_INTEGER,
+ Promise,
+ PromisePrototypeThen,
+ PromiseReject,
+ PromiseResolve,
+ PromiseWithResolvers,
+ SafeWeakMap,
+ SymbolAsyncDispose,
+ SymbolAsyncIterator,
+ SymbolDispose,
+ SymbolIterator,
+ TypedArrayPrototypeGetByteLength,
+} = primordials;
+
+const {
+ AbortError,
+ aggregateTwoErrors,
+ codes: {
+ ERR_INVALID_ARG_TYPE,
+ ERR_INVALID_ARG_VALUE,
+ ERR_INVALID_STATE,
+ ERR_STREAM_WRITE_AFTER_END,
+ },
+} = require('internal/errors');
+
+const {
+ validateInteger,
+ validateObject,
+} = require('internal/validators');
+
+const { eos } = require('internal/streams/end-of-stream');
+const {
+ addAbortSignal: addAbortSignalNoValidate,
+} = require('internal/streams/add-abort-signal');
+const {
+ queueMicrotask,
+} = require('internal/process/task_queues');
+
+const {
+ toAsyncStreamable: kToAsyncStreamable,
+ kValidatedSource,
+ drainableProtocol,
+} = require('internal/streams/iter/types');
+
+const {
+ validateBackpressure,
+ toUint8Array,
+} = require('internal/streams/iter/utils');
+
+const { Buffer } = require('buffer');
+const destroyImpl = require('internal/streams/destroy');
+
+// Lazy-loaded to avoid circular dependencies. Readable and Writable
+// both require this module's parent, so we defer the require.
+let Readable;
+let Writable;
+
+function lazyReadable() {
+ if (Readable === undefined) {
+ Readable = require('internal/streams/readable');
+ }
+ return Readable;
+}
+
+function lazyWritable() {
+ if (Writable === undefined) {
+ Writable = require('internal/streams/writable');
+ }
+ return Writable;
+}
+
+// ============================================================================
+// fromReadable(readable) -- classic Readable -> stream/iter async iterable
+// ============================================================================
+
+// Cache: one stream/iter source per Readable instance.
+const fromReadableCache = new SafeWeakMap();
+
+// Maximum chunks to drain into a single batch. Bounds peak memory when
+// _read() synchronously pushes many chunks into the buffer.
+const MAX_DRAIN_BATCH = 128;
+
+const { normalizeAsyncValue } = require('internal/streams/iter/from');
+const { isUint8Array } = require('internal/util/types');
+
+// Normalize a batch of raw chunks from an object-mode or encoded
+// Readable into Uint8Array values. Returns the normalized batch,
+// or null if normalization produced no output.
+async function normalizeBatch(raw) {
+ const batch = [];
+ for (let i = 0; i < raw.length; i++) {
+ const value = raw[i];
+ if (isUint8Array(value)) {
+ batch.push(value);
+ } else {
+ // normalizeAsyncValue may await for async protocols (e.g.
+ // toAsyncStreamable on yielded objects). Stream events during
+ // the suspension are queued, not lost -- errors will surface
+ // on the next loop iteration after this yield completes.
+ for await (const normalized of normalizeAsyncValue(value)) {
+ batch.push(normalized);
+ }
+ }
+ }
+ return batch.length > 0 ? batch : null;
+}
+
+// Batched async iterator for Readable streams. Same mechanism as
+// createAsyncIterator (same event setup, same stream.read() to
+// trigger _read(), same teardown) but drains all currently buffered
+// chunks into a single Uint8Array[] batch per yield, amortizing the
+// Promise/microtask cost across multiple chunks.
+//
+// When normalize is provided (object-mode / encoded streams), each
+// drained batch is passed through it to convert chunks to Uint8Array.
+// When normalize is null (byte-mode), chunks are already Buffers
+// (Uint8Array subclass) and are yielded directly.
+const nop = () => {};
+
+async function* createBatchedAsyncIterator(stream, normalize) {
+ let callback = nop;
+
+ function next(resolve) {
+ if (this === stream) {
+ callback();
+ callback = nop;
+ } else {
+ callback = resolve;
+ }
+ }
+
+ stream.on('readable', next);
+
+ let error;
+ const cleanup = eos(stream, { writable: false }, (err) => {
+ error = err ? aggregateTwoErrors(error, err) : null;
+ callback();
+ callback = nop;
+ });
+
+ try {
+ while (true) {
+ const chunk = stream.destroyed ? null : stream.read();
+ if (chunk !== null) {
+ const batch = [chunk];
+ while (batch.length < MAX_DRAIN_BATCH &&
+ stream._readableState?.length > 0) {
+ const c = stream.read();
+ if (c === null) break;
+ batch.push(c);
+ }
+ if (normalize !== null) {
+ const result = await normalize(batch);
+ if (result !== null) {
+ yield result;
+ }
+ } else {
+ yield batch;
+ }
+ } else if (error) {
+ throw error;
+ } else if (error === null) {
+ return;
+ } else {
+ await new Promise(next);
+ }
+ }
+ } catch (err) {
+ error = aggregateTwoErrors(error, err);
+ throw error;
+ } finally {
+ if (error === undefined ||
+ (stream._readableState?.autoDestroy)) {
+ destroyImpl.destroyer(stream, null);
+ } else {
+ stream.off('readable', next);
+ cleanup();
+ }
+ }
+}
+
+/**
+ * Convert a classic Readable (or duck-type) to a stream/iter async iterable.
+ *
+ * If the object implements the toAsyncStreamable protocol, delegates to it.
+ * Otherwise, duck-type checks for read() + EventEmitter (on/off) and
+ * wraps with a batched async iterator.
+ * @param {object} readable - A classic Readable or duck-type with
+ * read() and on()/off() methods.
+ * @returns {AsyncIterable} A stream/iter async iterable source.
+ */
+function fromReadable(readable) {
+ if (readable == null || typeof readable !== 'object') {
+ throw new ERR_INVALID_ARG_TYPE('readable', 'Readable', readable);
+ }
+
+ // Check cache first.
+ const cached = fromReadableCache.get(readable);
+ if (cached !== undefined) return cached;
+
+ // Protocol path: object implements toAsyncStreamable.
+ if (typeof readable[kToAsyncStreamable] === 'function') {
+ const result = readable[kToAsyncStreamable]();
+ fromReadableCache.set(readable, result);
+ return result;
+ }
+
+ // Duck-type path: object has read() and EventEmitter methods.
+ if (typeof readable.read !== 'function' ||
+ typeof readable.on !== 'function') {
+ throw new ERR_INVALID_ARG_TYPE('readable', 'Readable', readable);
+ }
+
+ // Determine normalization. If the stream has _readableState, use it
+ // to detect object-mode / encoding. Otherwise assume byte-mode.
+ const state = readable._readableState;
+ const normalize = (state && (state.objectMode || state.encoding)) ?
+ normalizeBatch : null;
+
+ const iter = createBatchedAsyncIterator(readable, normalize);
+ iter[kValidatedSource] = true;
+ iter.stream = readable;
+
+ fromReadableCache.set(readable, iter);
+ return iter;
+}
+
+
+// ============================================================================
+// toReadable(source, options) -- stream/iter source -> classic Readable
+// ============================================================================
+
+const kNullPrototype = { __proto__: null };
+
+/**
+ * Create a byte-mode Readable from an AsyncIterable.
+ * The source must yield Uint8Array[] batches (the stream/iter native
+ * format). Each Uint8Array in a batch is pushed as a separate chunk.
+ * @param {AsyncIterable} source
+ * @param {object} [options]
+ * @param {number} [options.highWaterMark]
+ * @param {AbortSignal} [options.signal]
+ * @returns {stream.Readable}
+ */
+function toReadable(source, options = kNullPrototype) {
+ if (typeof source?.[SymbolAsyncIterator] !== 'function') {
+ throw new ERR_INVALID_ARG_TYPE('source', 'AsyncIterable', source);
+ }
+
+ validateObject(options, 'options');
+ const {
+ highWaterMark = 64 * 1024,
+ signal,
+ } = options;
+ validateInteger(highWaterMark, 'options.highWaterMark', 0);
+
+ const ReadableCtor = lazyReadable();
+ const iterator = source[SymbolAsyncIterator]();
+ let backpressure;
+ let pumping = false;
+ let done = false;
+
+ const readable = new ReadableCtor({
+ __proto__: null,
+ highWaterMark,
+ read() {
+ if (backpressure) {
+ const { resolve } = backpressure;
+ backpressure = null;
+ resolve();
+ } else if (!pumping && !done) {
+ pumping = true;
+ pump();
+ }
+ },
+ destroy(err, cb) {
+ done = true;
+ // Wake up the pump if it's waiting on backpressure so it
+ // can see done === true and exit cleanly.
+ if (backpressure) {
+ backpressure.resolve();
+ backpressure = null;
+ }
+ if (typeof iterator.return === 'function') {
+ PromisePrototypeThen(iterator.return(),
+ () => cb(err), (e) => cb(e || err));
+ } else {
+ cb(err);
+ }
+ },
+ });
+
+ if (signal) {
+ addAbortSignalNoValidate(signal, readable);
+ }
+
+ async function pump() {
+ try {
+ while (!done) {
+ const { value: batch, done: iterDone } = await iterator.next();
+ if (iterDone) {
+ done = true;
+ readable.push(null);
+ return;
+ }
+ for (let i = 0; i < batch.length; i++) {
+ if (!readable.push(batch[i])) {
+ backpressure = PromiseWithResolvers();
+ await backpressure.promise;
+ if (done) return;
+ }
+ }
+ }
+ } catch (err) {
+ done = true;
+ readable.destroy(err);
+ }
+ }
+
+ return readable;
+}
+
+
+// ============================================================================
+// toReadableSync(source, options) -- stream/iter source (sync) -> Readable
+// ============================================================================
+
+/**
+ * Create a byte-mode Readable from an Iterable.
+ * Fully synchronous -- _read() pulls from the iterator directly.
+ * @param {Iterable} source
+ * @param {object} [options]
+ * @param {number} [options.highWaterMark]
+ * @returns {stream.Readable}
+ */
+function toReadableSync(source, options = kNullPrototype) {
+ if (typeof source?.[SymbolIterator] !== 'function') {
+ throw new ERR_INVALID_ARG_TYPE('source', 'Iterable', source);
+ }
+
+ validateObject(options, 'options');
+ const {
+ highWaterMark = 64 * 1024,
+ } = options;
+ validateInteger(highWaterMark, 'options.highWaterMark', 0);
+
+ const ReadableCtor = lazyReadable();
+ const iterator = source[SymbolIterator]();
+
+ return new ReadableCtor({
+ __proto__: null,
+ highWaterMark,
+ read() {
+ for (;;) {
+ const { value: batch, done } = iterator.next();
+ if (done) {
+ this.push(null);
+ return;
+ }
+ for (let i = 0; i < batch.length; i++) {
+ if (!this.push(batch[i])) return;
+ }
+ }
+ },
+ destroy(err, cb) {
+ if (typeof iterator.return === 'function') iterator.return();
+ cb(err);
+ },
+ });
+}
+
+
+// ============================================================================
+// fromWritable(writable, options) -- classic Writable -> stream/iter Writer
+// ============================================================================
+
+// Cache: one Writer adapter per Writable instance.
+const fromWritableCache = new SafeWeakMap();
+
+/**
+ * Create a stream/iter Writer adapter from a classic Writable (or duck-type).
+ *
+ * Duck-type requirements: write() and on()/off() methods.
+ * Falls back to sensible defaults for missing properties like
+ * writableHighWaterMark, writableLength, writableObjectMode.
+ * @param {object} writable - A classic Writable or duck-type.
+ * @param {object} [options]
+ * @param {string} [options.backpressure] - 'strict', 'block',
+ * 'drop-newest'. 'drop-oldest' is not supported.
+ * @returns {object} A stream/iter Writer adapter.
+ */
+function fromWritable(writable, options = kNullPrototype) {
+ if (writable == null ||
+ typeof writable.write !== 'function' ||
+ typeof writable.on !== 'function') {
+ throw new ERR_INVALID_ARG_TYPE('writable', 'Writable', writable);
+ }
+
+ // Return cached adapter if available.
+ const cached = fromWritableCache.get(writable);
+ if (cached !== undefined) return cached;
+
+ validateObject(options, 'options');
+ const {
+ backpressure = 'strict',
+ } = options;
+ validateBackpressure(backpressure);
+
+ // The Writer interface is bytes-only. Object-mode Writables expect
+ // arbitrary JS values, which is incompatible.
+ if (writable.writableObjectMode) {
+ throw new ERR_INVALID_STATE(
+ 'Cannot create a stream/iter Writer from an object-mode Writable');
+ }
+
+ // drop-oldest is not supported for classic stream.Writable. The
+ // Writable's internal buffer stores individual { chunk, encoding,
+ // callback } entries with no concept of batch boundaries. A writev()
+ // call fans out into N separate buffer entries, so a subsequent
+ // drop-oldest eviction could partially tear apart an earlier atomic
+ // writev batch. The PushWriter avoids this because writev occupies a
+ // single slot. Supporting drop-oldest here would require either
+ // accepting partial writev eviction or adding batch tracking to the
+ // buffer -- neither is acceptable without a deeper rework of Writable
+ // internals.
+ if (backpressure === 'drop-oldest') {
+ throw new ERR_INVALID_ARG_VALUE('options.backpressure', backpressure,
+ 'drop-oldest is not supported for classic stream.Writable');
+ }
+
+ // Fall back to sensible defaults for duck-typed streams that may not
+ // expose the full stream.Writable property set.
+ const hwm = writable.writableHighWaterMark ?? 16384;
+ let totalBytes = 0;
+
+ // Waiters pending on backpressure resolution (block policy only).
+ // Multiple un-awaited writes can each add a waiter, so this must be
+ // a list. A single persistent 'drain' listener and 'error' listener
+ // (installed once lazily) resolve or reject all waiters to avoid
+ // accumulating per-write listeners on the stream.
+ let waiters = [];
+ let listenersInstalled = false;
+ let onDrain;
+ let onError;
+
+ function installListeners() {
+ if (listenersInstalled) return;
+ listenersInstalled = true;
+ onDrain = () => {
+ const pending = waiters;
+ waiters = [];
+ for (let i = 0; i < pending.length; i++) {
+ pending[i].resolve();
+ }
+ };
+ onError = (err) => {
+ const pending = waiters;
+ waiters = [];
+ for (let i = 0; i < pending.length; i++) {
+ pending[i].reject(err);
+ }
+ };
+ writable.on('drain', onDrain);
+ writable.on('error', onError);
+ }
+
+ // Reject all pending waiters and remove the drain/error listeners.
+ function cleanup(err) {
+ const pending = waiters;
+ waiters = [];
+ for (let i = 0; i < pending.length; i++) {
+ pending[i].reject(err ?? new AbortError());
+ }
+ if (!listenersInstalled) return;
+ listenersInstalled = false;
+ writable.removeListener('drain', onDrain);
+ writable.removeListener('error', onError);
+ }
+
+ function waitForDrain() {
+ const { promise, resolve, reject } = PromiseWithResolvers();
+ waiters.push({ __proto__: null, resolve, reject });
+ installListeners();
+ return promise;
+ }
+
+ function isWritable() {
+ // Duck-typed streams may not have these properties -- treat missing
+ // as false (i.e., writable is still open).
+ return !(writable.destroyed ?? false) &&
+ !(writable.writableFinished ?? false) &&
+ !(writable.writableEnded ?? false);
+ }
+
+ function isFull() {
+ return (writable.writableLength ?? 0) >= hwm;
+ }
+
+ const writer = {
+ __proto__: null,
+
+ get desiredSize() {
+ if (!isWritable()) return null;
+ return MathMax(0, hwm - (writable.writableLength ?? 0));
+ },
+
+ writeSync(chunk) {
+ return false;
+ },
+
+ writevSync(chunks) {
+ return false;
+ },
+
+ // Backpressure semantics: write() resolves when the data is accepted
+ // into the Writable's internal buffer, NOT when _write() has flushed
+ // it to the underlying resource. This matches the Writer spec -- the
+ // PushWriter resolves on buffer acceptance too. Classic Writable flow
+ // control works the same way: write rapidly until write() returns
+ // false, then wait for 'drain'. The _write callback is involved in
+ // backpressure indirectly -- 'drain' fires after callbacks drain the
+ // buffer below highWaterMark. Per-write errors from _write surface
+ // as 'error' events caught by our generic error handler, rejecting
+ // the next pending operation rather than the already-resolved one.
+ //
+ // The options.signal parameter from the Writer interface is ignored.
+ // Classic stream.Writable has no per-write abort signal support;
+ // cancellation should be handled at the pipeline level instead.
+ write(chunk) {
+ if (!isWritable()) {
+ return PromiseReject(new ERR_STREAM_WRITE_AFTER_END());
+ }
+
+ let bytes;
+ try {
+ bytes = toUint8Array(chunk);
+ } catch (err) {
+ return PromiseReject(err);
+ }
+
+ if (backpressure === 'strict' && isFull()) {
+ return PromiseReject(new ERR_INVALID_STATE.RangeError(
+ 'Backpressure violation: buffer is full. ' +
+ 'Await each write() call to respect backpressure.'));
+ }
+
+ if (backpressure === 'drop-newest' && isFull()) {
+ // Silently discard. Still count bytes for consistency with
+ // PushWriter, which counts dropped bytes in totalBytes.
+ totalBytes += TypedArrayPrototypeGetByteLength(bytes);
+ return PromiseResolve();
+ }
+
+ totalBytes += TypedArrayPrototypeGetByteLength(bytes);
+ const ok = writable.write(bytes);
+ if (ok) return PromiseResolve();
+
+ // backpressure === 'block' (or strict with room that filled on
+ // this write -- writable.write() accepted the data but returned
+ // false indicating the buffer is now at/over hwm).
+ if (backpressure === 'block') {
+ return waitForDrain();
+ }
+
+ // strict: the write was accepted (there was room before writing)
+ // but the buffer is now full. Resolve -- the *next* write will
+ // be rejected if the caller ignores backpressure.
+ return PromiseResolve();
+ },
+
+ writev(chunks) {
+ if (!ArrayIsArray(chunks)) {
+ throw new ERR_INVALID_ARG_TYPE('chunks', 'Array', chunks);
+ }
+ if (!isWritable()) {
+ return PromiseReject(new ERR_STREAM_WRITE_AFTER_END());
+ }
+
+ if (backpressure === 'strict' && isFull()) {
+ return PromiseReject(new ERR_INVALID_STATE.RangeError(
+ 'Backpressure violation: buffer is full. ' +
+ 'Await each write() call to respect backpressure.'));
+ }
+
+ if (backpressure === 'drop-newest' && isFull()) {
+ // Discard entire batch.
+ for (let i = 0; i < chunks.length; i++) {
+ totalBytes +=
+ TypedArrayPrototypeGetByteLength(toUint8Array(chunks[i]));
+ }
+ return PromiseResolve();
+ }
+
+ if (typeof writable.cork === 'function') writable.cork();
+ let ok = true;
+ for (let i = 0; i < chunks.length; i++) {
+ const bytes = toUint8Array(chunks[i]);
+ totalBytes += TypedArrayPrototypeGetByteLength(bytes);
+ ok = writable.write(bytes);
+ }
+ if (typeof writable.uncork === 'function') writable.uncork();
+
+ if (ok) return PromiseResolve();
+
+ if (backpressure === 'block') {
+ return waitForDrain();
+ }
+
+ return PromiseResolve();
+ },
+
+ endSync() {
+ return -1;
+ },
+
+ // options.signal is ignored for the same reason as write().
+ end() {
+ if ((writable.writableFinished ?? false) ||
+ (writable.destroyed ?? false)) {
+ cleanup();
+ return PromiseResolve(totalBytes);
+ }
+
+ const { promise, resolve, reject } = PromiseWithResolvers();
+
+ if (!(writable.writableEnded ?? false)) {
+ writable.end();
+ }
+
+ eos(writable, { writable: true, readable: false }, (err) => {
+ cleanup(err);
+ if (err) reject(err);
+ else resolve(totalBytes);
+ });
+
+ return promise;
+ },
+
+ fail(reason) {
+ cleanup(reason);
+ if (typeof writable.destroy === 'function') {
+ writable.destroy(reason);
+ }
+ },
+
+ [SymbolAsyncDispose]() {
+ if (isWritable()) {
+ cleanup();
+ if (typeof writable.destroy === 'function') {
+ writable.destroy();
+ }
+ }
+ return PromiseResolve();
+ },
+
+ [SymbolDispose]() {
+ if (isWritable()) {
+ cleanup();
+ if (typeof writable.destroy === 'function') {
+ writable.destroy();
+ }
+ }
+ },
+ };
+
+ // drainableProtocol
+ writer[drainableProtocol] = function() {
+ if (!isWritable()) return null;
+ if ((writable.writableLength ?? 0) < hwm) {
+ return PromiseResolve(true);
+ }
+ const { promise, resolve } = PromiseWithResolvers();
+ waiters.push({
+ __proto__: null,
+ resolve() { resolve(true); },
+ reject() { resolve(false); },
+ });
+ installListeners();
+ return promise;
+ };
+
+ fromWritableCache.set(writable, writer);
+ return writer;
+}
+
+
+// ============================================================================
+// toWritable(writer) -- stream/iter Writer -> classic Writable
+// ============================================================================
+
+/**
+ * Create a classic stream.Writable backed by a stream/iter Writer.
+ * Each _write/_writev call delegates to the Writer's methods,
+ * attempting the sync path first (writeSync/writevSync/endSync) and
+ * falling back to async if the sync path returns false or throws.
+ * @param {object} writer - A stream/iter Writer (only write() is required).
+ * @returns {stream.Writable}
+ */
+function toWritable(writer) {
+ if (typeof writer?.write !== 'function') {
+ throw new ERR_INVALID_ARG_TYPE('writer', 'Writer', writer);
+ }
+
+ const WritableCtor = lazyWritable();
+
+ const hasWriteSync = typeof writer.writeSync === 'function';
+ const hasWritev = typeof writer.writev === 'function';
+ const hasWritevSync = hasWritev &&
+ typeof writer.writevSync === 'function';
+ const hasEnd = typeof writer.end === 'function';
+ const hasEndSync = hasEnd &&
+ typeof writer.endSync === 'function';
+ const hasFail = typeof writer.fail === 'function';
+
+ // Try-sync-first pattern: attempt the synchronous method and
+ // fall back to the async method if it returns false (indicating
+ // the sync path was not accepted) or throws. When the sync path
+ // succeeds, the callback is deferred via queueMicrotask to
+ // preserve the async resolution contract that Writable internals
+ // expect from _write/_writev/_final callbacks.
+
+ function _write(chunk, encoding, cb) {
+ const bytes = typeof chunk === 'string' ?
+ Buffer.from(chunk, encoding) : chunk;
+ if (hasWriteSync) {
+ try {
+ if (writer.writeSync(bytes)) {
+ queueMicrotask(cb);
+ return;
+ }
+ } catch {
+ // Sync path threw -- fall through to async.
+ }
+ }
+ try {
+ PromisePrototypeThen(writer.write(bytes), () => cb(), cb);
+ } catch (err) {
+ cb(err);
+ }
+ }
+
+ function _writev(entries, cb) {
+ const chunks = [];
+ for (let i = 0; i < entries.length; i++) {
+ const { chunk, encoding } = entries[i];
+ chunks[i] = typeof chunk === 'string' ?
+ Buffer.from(chunk, encoding) : chunk;
+ }
+ if (hasWritevSync) {
+ try {
+ if (writer.writevSync(chunks)) {
+ queueMicrotask(cb);
+ return;
+ }
+ } catch {
+ // Sync path threw -- fall through to async.
+ }
+ }
+ try {
+ PromisePrototypeThen(writer.writev(chunks), () => cb(), cb);
+ } catch (err) {
+ cb(err);
+ }
+ }
+
+ function _final(cb) {
+ if (!hasEnd) {
+ queueMicrotask(cb);
+ return;
+ }
+ if (hasEndSync) {
+ try {
+ const result = writer.endSync();
+ if (result >= 0) {
+ queueMicrotask(cb);
+ return;
+ }
+ } catch {
+ // Sync path threw -- fall through to async.
+ }
+ }
+ try {
+ PromisePrototypeThen(writer.end(), () => cb(), cb);
+ } catch (err) {
+ cb(err);
+ }
+ }
+
+ function _destroy(err, cb) {
+ if (err && hasFail) {
+ writer.fail(err);
+ }
+ cb();
+ }
+
+ const writableOptions = {
+ __proto__: null,
+ // Use MAX_SAFE_INTEGER to effectively disable the Writable's
+ // internal buffering. The underlying stream/iter Writer has its
+ // own backpressure handling; we want _write to be called
+ // immediately so the Writer can manage flow control directly.
+ highWaterMark: NumberMAX_SAFE_INTEGER,
+ write: _write,
+ final: _final,
+ destroy: _destroy,
+ };
+
+ if (hasWritev) {
+ writableOptions.writev = _writev;
+ }
+
+ return new WritableCtor(writableOptions);
+}
+
+
+module.exports = {
+ // Shared helpers used by Readable.prototype[toAsyncStreamable] in
+ // readable.js to avoid duplicating the batched iterator logic.
+ createBatchedAsyncIterator,
+ normalizeBatch,
+
+ // Public utilities exported from 'stream/iter'.
+ fromReadable,
+ fromWritable,
+ toReadable,
+ toReadableSync,
+ toWritable,
+};
diff --git a/lib/internal/streams/iter/from.js b/lib/internal/streams/iter/from.js
index d76f430ab0d51e..0533f0e3810398 100644
--- a/lib/internal/streams/iter/from.js
+++ b/lib/internal/streams/iter/from.js
@@ -37,6 +37,7 @@ const {
} = require('internal/util/types');
const {
+ kValidatedSource,
toStreamable,
toAsyncStreamable,
} = require('internal/streams/iter/types');
@@ -483,6 +484,11 @@ function from(input) {
throw new ERR_INVALID_ARG_TYPE('input', 'a non-null value', input);
}
+ // Fast path: validated source already yields valid Uint8Array[] batches
+ if (input[kValidatedSource]) {
+ return input;
+ }
+
// Check for primitives first (ByteInput)
if (isPrimitiveChunk(input)) {
const chunk = primitiveToUint8Array(input);
@@ -531,11 +537,22 @@ function from(input) {
// Check toAsyncStreamable protocol (takes precedence over toStreamable and
// iteration protocols)
if (typeof input[toAsyncStreamable] === 'function') {
+ const result = input[toAsyncStreamable]();
+ // Synchronous validated source (e.g. Readable batched iterator)
+ if (result?.[kValidatedSource]) {
+ return result;
+ }
return {
__proto__: null,
async *[SymbolAsyncIterator]() {
- const result = await input[toAsyncStreamable]();
- yield* from(result)[SymbolAsyncIterator]();
+ // The result may be a Promise. Check validated on both the Promise
+ // itself (if tagged) and the resolved value.
+ const resolved = await result;
+ if (resolved?.[kValidatedSource]) {
+ yield* resolved[SymbolAsyncIterator]();
+ return;
+ }
+ yield* from(resolved)[SymbolAsyncIterator]();
},
};
}
diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js
index 8c49941ddb54ed..df5ca2826a9ac5 100644
--- a/lib/internal/streams/iter/pull.js
+++ b/lib/internal/streams/iter/pull.js
@@ -51,7 +51,7 @@ const {
} = require('internal/streams/iter/utils');
const {
- kTrustedTransform,
+ kValidatedTransform,
} = require('internal/streams/iter/types');
// =============================================================================
@@ -564,12 +564,12 @@ async function* applyStatefulAsyncTransform(source, transform, options) {
}
/**
- * Fast path for trusted stateful transforms (e.g. compression).
+ * Fast path for validated stateful transforms (e.g. compression).
* Skips withFlushAsync (transform handles done internally) and
* skips isUint8ArrayBatch validation (transform guarantees valid output).
* @yields {Uint8Array[]}
*/
-async function* applyTrustedStatefulAsyncTransform(source, transform, options) {
+async function* applyValidatedStatefulAsyncTransform(source, transform, options) {
const output = transform(source, options);
for await (const batch of output) {
if (batch.length > 0) {
@@ -639,8 +639,8 @@ async function* createAsyncPipeline(source, transforms, signal) {
statelessRun = [];
}
const opts = { __proto__: null, signal: transformSignal };
- if (transform[kTrustedTransform]) {
- current = applyTrustedStatefulAsyncTransform(
+ if (transform[kValidatedTransform]) {
+ current = applyValidatedStatefulAsyncTransform(
current, transform.transform, opts);
} else {
current = applyStatefulAsyncTransform(
diff --git a/lib/internal/streams/iter/transform.js b/lib/internal/streams/iter/transform.js
index 4cb417ed98ce32..9782f5f50ebf2c 100644
--- a/lib/internal/streams/iter/transform.js
+++ b/lib/internal/streams/iter/transform.js
@@ -37,7 +37,7 @@ const {
} = require('internal/errors');
const { lazyDOMException } = require('internal/util');
const { isArrayBufferView, isAnyArrayBuffer } = require('internal/util/types');
-const { kTrustedTransform } = require('internal/streams/iter/types');
+const { kValidatedTransform } = require('internal/streams/iter/types');
const {
checkRangesOrGetDefault,
validateFiniteNumber,
@@ -306,7 +306,7 @@ function createZstdHandle(mode, options, processCallback, onError) {
function makeZlibTransform(createHandleFn, processFlag, finishFlag) {
return {
__proto__: null,
- [kTrustedTransform]: true,
+ [kValidatedTransform]: true,
transform: async function*(source, options) {
const { signal } = options;
diff --git a/lib/internal/streams/iter/types.js b/lib/internal/streams/iter/types.js
index c205db00e3782a..99ddc8fd582770 100644
--- a/lib/internal/streams/iter/types.js
+++ b/lib/internal/streams/iter/types.js
@@ -45,20 +45,30 @@ const shareSyncProtocol = SymbolFor('Stream.shareSyncProtocol');
const drainableProtocol = SymbolFor('Stream.drainableProtocol');
/**
- * Internal sentinel for trusted stateful transforms. A transform object
- * with [kTrustedTransform] = true signals that:
+ * Internal sentinel for validated stateful transforms. A transform object
+ * with [kValidatedTransform] = true signals that:
* 1. It handles source exhaustion (done) internally - no withFlushAsync
* wrapper needed.
* 2. It always yields valid Uint8Array[] batches - no isUint8ArrayBatch
* validation needed on each yield.
* This is NOT a public protocol symbol - it uses Symbol() not Symbol.for().
*/
-const kTrustedTransform = Symbol('kTrustedTransform');
+const kValidatedTransform = Symbol('kValidatedTransform');
+
+/**
+ * Internal sentinel for validated sources. An async iterable with
+ * [kValidatedSource] = true signals that it already yields valid
+ * Uint8Array[] batches - no normalizeAsyncSource wrapper needed.
+ * from() will return such sources directly, skipping all normalization.
+ * This is NOT a public protocol symbol - it uses Symbol() not Symbol.for().
+ */
+const kValidatedSource = Symbol('kValidatedSource');
module.exports = {
broadcastProtocol,
drainableProtocol,
- kTrustedTransform,
+ kValidatedSource,
+ kValidatedTransform,
shareProtocol,
shareSyncProtocol,
toAsyncStreamable,
diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js
index 919c527a2be6f8..8ee754edcad14b 100644
--- a/lib/internal/streams/readable.js
+++ b/lib/internal/streams/readable.js
@@ -35,6 +35,7 @@ const {
Symbol,
SymbolAsyncDispose,
SymbolAsyncIterator,
+ SymbolFor,
SymbolSpecies,
TypedArrayPrototypeSet,
} = primordials;
@@ -52,6 +53,8 @@ const {
} = require('internal/streams/add-abort-signal');
const { eos } = require('internal/streams/end-of-stream');
+const { getOptionValue } = require('internal/options');
+
let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
debug = fn;
});
@@ -82,6 +85,7 @@ const {
ERR_INVALID_ARG_TYPE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_OUT_OF_RANGE,
+ ERR_STREAM_ITER_MISSING_FLAG,
ERR_STREAM_PUSH_AFTER_EOF,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
ERR_UNKNOWN_ENCODING,
@@ -1796,3 +1800,42 @@ Readable.wrap = function(src, options) {
},
}).wrap(src);
};
+
+// Interop with the stream/iter API via the toAsyncStreamable protocol.
+//
+// The batched iterator logic lives in classic.js (shared with the
+// fromReadable() utility for duck-typed streams). This prototype method
+// calls createBatchedAsyncIterator directly -- it must NOT call
+// fromReadable() since fromReadable() checks for toAsyncStreamable,
+// which would create infinite recursion.
+//
+// The flag cannot be checked at module load time (readable.js loads during
+// bootstrap before options are available). Instead, toAsyncStreamable is
+// always defined but lazily initializes on first call -- throwing if the
+// flag is not set.
+{
+ const toAsyncStreamable = SymbolFor('Stream.toAsyncStreamable');
+ let createBatchedAsyncIterator;
+ let normalizeBatch;
+ let kValidatedSource;
+
+ Readable.prototype[toAsyncStreamable] = function() {
+ if (createBatchedAsyncIterator === undefined) {
+ if (!getOptionValue('--experimental-stream-iter')) {
+ throw new ERR_STREAM_ITER_MISSING_FLAG();
+ }
+ ({
+ createBatchedAsyncIterator,
+ normalizeBatch,
+ } = require('internal/streams/iter/classic'));
+ ({ kValidatedSource } = require('internal/streams/iter/types'));
+ }
+ const state = this._readableState;
+ const normalize = (state.objectMode || state.encoding) ?
+ normalizeBatch : null;
+ const iter = createBatchedAsyncIterator(this, normalize);
+ iter[kValidatedSource] = true;
+ iter.stream = this;
+ return iter;
+ };
+}
diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js
index e934c38bbf9acc..89d384e9f2fb8f 100644
--- a/lib/internal/streams/writable.js
+++ b/lib/internal/streams/writable.js
@@ -47,7 +47,6 @@ const Stream = require('internal/streams/legacy').Stream;
const { Buffer } = require('buffer');
const destroyImpl = require('internal/streams/destroy');
const { eos } = require('internal/streams/end-of-stream');
-
const {
addAbortSignal,
} = require('internal/streams/add-abort-signal');
diff --git a/lib/stream/iter.js b/lib/stream/iter.js
index e77e485a7f2bd5..4cb499bb161694 100644
--- a/lib/stream/iter.js
+++ b/lib/stream/iter.js
@@ -50,6 +50,15 @@ const {
ondrain,
} = require('internal/streams/iter/consumers');
+// Classic stream interop (Node.js-specific, not part of the spec)
+const {
+ fromReadable,
+ fromWritable,
+ toReadable,
+ toReadableSync,
+ toWritable,
+} = require('internal/streams/iter/classic');
+
// Multi-consumer
const { broadcast, Broadcast } = require('internal/streams/iter/broadcast');
const {
@@ -177,4 +186,11 @@ module.exports = {
tap,
tapSync,
ondrain,
+
+ // Classic stream interop
+ fromReadable,
+ fromWritable,
+ toReadable,
+ toReadableSync,
+ toWritable,
};
diff --git a/test/parallel/test-stream-iter-readable-interop-disabled.js b/test/parallel/test-stream-iter-readable-interop-disabled.js
new file mode 100644
index 00000000000000..86c8cc141387cd
--- /dev/null
+++ b/test/parallel/test-stream-iter-readable-interop-disabled.js
@@ -0,0 +1,57 @@
+'use strict';
+
+// Tests that toAsyncStreamable throws ERR_STREAM_ITER_MISSING_FLAG
+// when --experimental-stream-iter is not enabled.
+
+const common = require('../common');
+const assert = require('assert');
+const { spawnPromisified } = common;
+
+async function testToAsyncStreamableWithoutFlag() {
+ const { stderr, code } = await spawnPromisified(process.execPath, [
+ '-e',
+ `
+ const { Readable } = require('stream');
+ const r = new Readable({ read() {} });
+ r[Symbol.for('Stream.toAsyncStreamable')]();
+ `,
+ ]);
+ assert.notStrictEqual(code, 0);
+ assert.match(stderr, /ERR_STREAM_ITER_MISSING_FLAG/);
+}
+
+async function testToAsyncStreamableWithFlag() {
+ const { code } = await spawnPromisified(process.execPath, [
+ '--experimental-stream-iter',
+ '-e',
+ `
+ const { Readable } = require('stream');
+ const r = new Readable({
+ read() { this.push(Buffer.from('ok')); this.push(null); }
+ });
+ const sym = Symbol.for('Stream.toAsyncStreamable');
+ const iter = r[sym]();
+ // Should not throw, and should have stream property
+ if (!iter.stream) process.exit(1);
+ `,
+ ]);
+ assert.strictEqual(code, 0);
+}
+
+async function testStreamIterModuleWithoutFlag() {
+ // Requiring 'stream/iter' without the flag should not be possible
+ // since the module is gated behind --experimental-stream-iter.
+ const { code } = await spawnPromisified(process.execPath, [
+ '-e',
+ `
+ require('stream/iter');
+ `,
+ ]);
+ assert.notStrictEqual(code, 0);
+}
+
+Promise.all([
+ testToAsyncStreamableWithoutFlag(),
+ testToAsyncStreamableWithFlag(),
+ testStreamIterModuleWithoutFlag(),
+]).then(common.mustCall());
diff --git a/test/parallel/test-stream-iter-readable-interop.js b/test/parallel/test-stream-iter-readable-interop.js
new file mode 100644
index 00000000000000..8100b54168be87
--- /dev/null
+++ b/test/parallel/test-stream-iter-readable-interop.js
@@ -0,0 +1,640 @@
+// Flags: --experimental-stream-iter
+'use strict';
+
+// Tests for classic Readable stream interop with the stream/iter API
+// via the toAsyncStreamable protocol and kValidatedSource optimization.
+
+const common = require('../common');
+const assert = require('assert');
+const { Readable } = require('stream');
+const {
+ from,
+ pull,
+ bytes,
+ text,
+} = require('stream/iter');
+
+const toAsyncStreamable = Symbol.for('Stream.toAsyncStreamable');
+
+// =============================================================================
+// toAsyncStreamable protocol is present on Readable.prototype
+// =============================================================================
+
+function testProtocolExists() {
+ assert.strictEqual(typeof Readable.prototype[toAsyncStreamable], 'function');
+
+ const readable = new Readable({ read() {} });
+ assert.strictEqual(typeof readable[toAsyncStreamable], 'function');
+}
+
+// =============================================================================
+// Byte-mode Readable: basic round-trip through from()
+// =============================================================================
+
+async function testByteModeThroughFrom() {
+ const readable = new Readable({
+ read() {
+ this.push(Buffer.from('hello'));
+ this.push(Buffer.from(' world'));
+ this.push(null);
+ },
+ });
+
+ const result = await text(from(readable));
+ assert.strictEqual(result, 'hello world');
+}
+
+// =============================================================================
+// Byte-mode Readable: basic round-trip through pull()
+// =============================================================================
+
+async function testByteModeThroughPull() {
+ const readable = new Readable({
+ read() {
+ this.push(Buffer.from('pull '));
+ this.push(Buffer.from('test'));
+ this.push(null);
+ },
+ });
+
+ const result = await text(pull(readable));
+ assert.strictEqual(result, 'pull test');
+}
+
+// =============================================================================
+// Byte-mode Readable: bytes consumer
+// =============================================================================
+
+async function testByteModeBytes() {
+ const data = Buffer.from('binary data here');
+ const readable = new Readable({
+ read() {
+ this.push(data);
+ this.push(null);
+ },
+ });
+
+ const result = await bytes(from(readable));
+ assert.deepStrictEqual(result, new Uint8Array(data));
+}
+
+// =============================================================================
+// Byte-mode Readable: batching - multiple buffered chunks yield as one batch
+// =============================================================================
+
+async function testBatchingBehavior() {
+ const readable = new Readable({
+ read() {
+ // Push multiple chunks synchronously so they all buffer
+ for (let i = 0; i < 10; i++) {
+ this.push(Buffer.from(`chunk${i}`));
+ }
+ this.push(null);
+ },
+ });
+
+ const source = from(readable);
+ const batches = [];
+ for await (const batch of source) {
+ batches.push(batch);
+ }
+
+ // All chunks were buffered synchronously, so they should come out
+ // as fewer batches than individual chunks (ideally one batch).
+ assert.ok(batches.length < 10,
+ `Expected fewer batches than chunks, got ${batches.length}`);
+
+ // Total data should be correct
+ const allChunks = batches.flat();
+ const combined = Buffer.concat(allChunks);
+ let expected = '';
+ for (let i = 0; i < 10; i++) {
+ expected += `chunk${i}`;
+ }
+ assert.strictEqual(combined.toString(), expected);
+}
+
+// =============================================================================
+// Byte-mode Readable: kValidatedSource is set
+// =============================================================================
+
+function testTrustedSourceByteMode() {
+ const readable = new Readable({ read() {} });
+ const result = readable[toAsyncStreamable]();
+ // kValidatedSource is a private symbol, but we can verify the result
+ // is used directly by from() without wrapping by checking it has
+ // Symbol.asyncIterator
+ assert.strictEqual(typeof result[Symbol.asyncIterator], 'function');
+ assert.strictEqual(result.stream, readable);
+}
+
+// =============================================================================
+// Byte-mode Readable: multi-read with delayed pushes
+// =============================================================================
+
+async function testDelayedPushes() {
+ let pushCount = 0;
+ const readable = new Readable({
+ read() {
+ if (pushCount < 3) {
+ setTimeout(() => {
+ this.push(Buffer.from(`delayed${pushCount}`));
+ pushCount++;
+ if (pushCount === 3) {
+ this.push(null);
+ }
+ }, 10);
+ }
+ },
+ });
+
+ const result = await text(from(readable));
+ assert.strictEqual(result, 'delayed0delayed1delayed2');
+}
+
+// =============================================================================
+// Byte-mode Readable: empty stream
+// =============================================================================
+
+async function testEmptyStream() {
+ const readable = new Readable({
+ read() {
+ this.push(null);
+ },
+ });
+
+ const result = await text(from(readable));
+ assert.strictEqual(result, '');
+}
+
+// =============================================================================
+// Byte-mode Readable: error propagation
+// =============================================================================
+
+async function testErrorPropagation() {
+ const readable = new Readable({
+ read() {
+ process.nextTick(() => this.destroy(new Error('test error')));
+ },
+ });
+
+ await assert.rejects(
+ text(from(readable)),
+ (err) => err.message === 'test error',
+ );
+}
+
+// =============================================================================
+// Byte-mode Readable: with transforms
+// =============================================================================
+
+async function testWithTransform() {
+ const readable = new Readable({
+ read() {
+ this.push(Buffer.from('hello'));
+ this.push(null);
+ },
+ });
+
+ // Uppercase transform
+ function uppercase(chunks) {
+ if (chunks === null) return null;
+ return chunks.map((c) => {
+ const buf = Buffer.from(c);
+ for (let i = 0; i < buf.length; i++) {
+ if (buf[i] >= 97 && buf[i] <= 122) buf[i] -= 32;
+ }
+ return buf;
+ });
+ }
+
+ const result = await text(pull(readable, uppercase));
+ assert.strictEqual(result, 'HELLO');
+}
+
+// =============================================================================
+// Object-mode Readable: strings are normalized to Uint8Array
+// =============================================================================
+
+async function testObjectModeStrings() {
+ const readable = new Readable({
+ objectMode: true,
+ read() {
+ this.push('hello');
+ this.push(' object');
+ this.push(' mode');
+ this.push(null);
+ },
+ });
+
+ const result = await text(from(readable));
+ assert.strictEqual(result, 'hello object mode');
+}
+
+// =============================================================================
+// Object-mode Readable: Uint8Array chunks pass through
+// =============================================================================
+
+async function testObjectModeUint8Array() {
+ const readable = new Readable({
+ objectMode: true,
+ read() {
+ this.push(new Uint8Array([72, 73])); // "HI"
+ this.push(null);
+ },
+ });
+
+ const result = await text(from(readable));
+ assert.strictEqual(result, 'HI');
+}
+
+// =============================================================================
+// Object-mode Readable: mixed types (strings + Uint8Array)
+// =============================================================================
+
+async function testObjectModeMixed() {
+ const readable = new Readable({
+ objectMode: true,
+ read() {
+ this.push('hello');
+ this.push(Buffer.from(' '));
+ this.push('world');
+ this.push(null);
+ },
+ });
+
+ const result = await text(from(readable));
+ assert.strictEqual(result, 'hello world');
+}
+
+// =============================================================================
+// Object-mode Readable: toStreamable protocol objects
+// =============================================================================
+
+async function testObjectModeToStreamable() {
+ const toStreamableSym = Symbol.for('Stream.toStreamable');
+ const readable = new Readable({
+ objectMode: true,
+ read() {
+ this.push({
+ [toStreamableSym]() {
+ return 'from-protocol';
+ },
+ });
+ this.push(null);
+ },
+ });
+
+ const result = await text(from(readable));
+ assert.strictEqual(result, 'from-protocol');
+}
+
+// =============================================================================
+// Object-mode Readable: kValidatedSource is set
+// =============================================================================
+
+function testTrustedSourceObjectMode() {
+ const readable = new Readable({ objectMode: true, read() {} });
+ const result = readable[toAsyncStreamable]();
+ assert.strictEqual(typeof result[Symbol.asyncIterator], 'function');
+ assert.strictEqual(result.stream, readable);
+}
+
+// =============================================================================
+// Encoded Readable: strings are re-encoded to Uint8Array
+// =============================================================================
+
+async function testEncodedReadable() {
+ const readable = new Readable({
+ encoding: 'utf8',
+ read() {
+ this.push(Buffer.from('encoded'));
+ this.push(null);
+ },
+ });
+
+ const result = await text(from(readable));
+ assert.strictEqual(result, 'encoded');
+}
+
+// =============================================================================
+// Readable.from() source: verify interop with Readable.from()
+// =============================================================================
+
+async function testReadableFrom() {
+ const readable = Readable.from(['chunk1', 'chunk2', 'chunk3']);
+
+ const result = await text(from(readable));
+ assert.strictEqual(result, 'chunk1chunk2chunk3');
+}
+
+// =============================================================================
+// Byte-mode Readable: large data
+// =============================================================================
+
+async function testLargeData() {
+ const totalSize = 1024 * 1024; // 1 MB
+ const chunkSize = 16384;
+ let pushed = 0;
+
+ const readable = new Readable({
+ read() {
+ if (pushed < totalSize) {
+ const size = Math.min(chunkSize, totalSize - pushed);
+ const buf = Buffer.alloc(size, 0x41); // Fill with 'A'
+ this.push(buf);
+ pushed += size;
+ } else {
+ this.push(null);
+ }
+ },
+ });
+
+ const result = await bytes(from(readable));
+ assert.strictEqual(result.length, totalSize);
+ assert.strictEqual(result[0], 0x41);
+ assert.strictEqual(result[totalSize - 1], 0x41);
+}
+
+// =============================================================================
+// Byte-mode Readable: consumer return (early termination)
+// =============================================================================
+
+async function testEarlyTermination() {
+ let pushCount = 0;
+ const readable = new Readable({
+ read() {
+ this.push(Buffer.from(`chunk${pushCount++}`));
+ // Never pushes null - infinite stream
+ },
+ });
+
+ // Take only the first batch
+ const source = from(readable);
+ const batches = [];
+ for await (const batch of source) {
+ batches.push(batch);
+ break; // Stop after first batch
+ }
+
+ assert.ok(batches.length >= 1);
+ // Stream should be destroyed after consumer return
+ // Give it a tick to clean up
+ await new Promise((resolve) => setTimeout(resolve, 50));
+ assert.ok(readable.destroyed);
+}
+
+// =============================================================================
+// Byte-mode Readable: pull() with compression transform
+// =============================================================================
+
+async function testWithCompression() {
+ const {
+ compressGzip,
+ decompressGzip,
+ } = require('zlib/iter');
+
+ const readable = new Readable({
+ read() {
+ this.push(Buffer.from('compress me via classic Readable'));
+ this.push(null);
+ },
+ });
+
+ const compressed = pull(readable, compressGzip());
+ const result = await text(pull(compressed, decompressGzip()));
+ assert.strictEqual(result, 'compress me via classic Readable');
+}
+
+// =============================================================================
+// Object-mode Readable: error propagation
+// =============================================================================
+
+async function testObjectModeError() {
+ const readable = new Readable({
+ objectMode: true,
+ read() {
+ process.nextTick(() => this.destroy(new Error('object error')));
+ },
+ });
+
+ await assert.rejects(
+ text(from(readable)),
+ (err) => err.message === 'object error',
+ );
+}
+
+// =============================================================================
+// Stream destroyed mid-iteration
+// =============================================================================
+
+async function testDestroyMidIteration() {
+ let pushCount = 0;
+ const readable = new Readable({
+ read() {
+ this.push(Buffer.from(`chunk${pushCount++}`));
+ // Never pushes null - infinite stream
+ },
+ });
+
+ const chunks = [];
+ await assert.rejects(async () => {
+ for await (const batch of from(readable)) {
+ chunks.push(...batch);
+ if (chunks.length >= 3) {
+ readable.destroy();
+ }
+ }
+ }, { code: 'ERR_STREAM_PREMATURE_CLOSE' });
+ assert.ok(readable.destroyed);
+ assert.ok(chunks.length >= 3);
+}
+
+// =============================================================================
+// Error after partial data
+// =============================================================================
+
+async function testErrorAfterPartialData() {
+ let count = 0;
+ const readable = new Readable({
+ read() {
+ if (count < 3) {
+ this.push(Buffer.from(`ok${count++}`));
+ } else {
+ this.destroy(new Error('late error'));
+ }
+ },
+ });
+
+ const chunks = [];
+ await assert.rejects(async () => {
+ for await (const batch of from(readable)) {
+ chunks.push(...batch);
+ }
+ }, { message: 'late error' });
+ assert.ok(chunks.length > 0, 'Should have received partial data');
+}
+
+// =============================================================================
+// Multiple consumers (second iteration yields empty)
+// =============================================================================
+
+async function testMultipleConsumers() {
+ const readable = new Readable({
+ read() {
+ this.push(Buffer.from('data'));
+ this.push(null);
+ },
+ });
+
+ const first = await text(from(readable));
+ assert.strictEqual(first, 'data');
+
+ // Second consumption - stream is already consumed/destroyed
+ const second = await text(from(readable));
+ assert.strictEqual(second, '');
+}
+
+// =============================================================================
+// highWaterMark: 0 - each chunk becomes its own batch
+// =============================================================================
+
+async function testHighWaterMarkZero() {
+ let pushCount = 0;
+ const readable = new Readable({
+ highWaterMark: 0,
+ read() {
+ if (pushCount < 3) {
+ this.push(Buffer.from(`hwm0-${pushCount++}`));
+ } else {
+ this.push(null);
+ }
+ },
+ });
+
+ const batches = [];
+ for await (const batch of from(readable)) {
+ batches.push(batch);
+ }
+
+ // With HWM=0, buffer is always empty so drain loop never fires.
+ // Each chunk should be its own batch.
+ assert.strictEqual(batches.length, 3);
+ const combined = Buffer.concat(batches.flat());
+ assert.strictEqual(combined.toString(), 'hwm0-0hwm0-1hwm0-2');
+}
+
+// =============================================================================
+// Duplex stream (Duplex extends Readable, toAsyncStreamable should work)
+// =============================================================================
+
+async function testDuplexStream() {
+ const { Duplex } = require('stream');
+
+ const duplex = new Duplex({
+ read() {
+ this.push(Buffer.from('duplex-data'));
+ this.push(null);
+ },
+ write(chunk, enc, cb) { cb(); },
+ });
+
+ const result = await text(from(duplex));
+ assert.strictEqual(result, 'duplex-data');
+}
+
+// =============================================================================
+// setEncoding called dynamically after construction
+// =============================================================================
+
+async function testSetEncodingDynamic() {
+ const readable = new Readable({
+ read() {
+ this.push(Buffer.from('dynamic-enc'));
+ this.push(null);
+ },
+ });
+
+ readable.setEncoding('utf8');
+
+ const result = await text(from(readable));
+ assert.strictEqual(result, 'dynamic-enc');
+}
+
+// =============================================================================
+// AbortSignal cancellation
+// =============================================================================
+
+async function testAbortSignal() {
+ let pushCount = 0;
+ const readable = new Readable({
+ read() {
+ this.push(Buffer.from(`sig${pushCount++}`));
+ // Never pushes null - infinite stream
+ },
+ });
+
+ const ac = new AbortController();
+ const chunks = [];
+
+ await assert.rejects(async () => {
+ for await (const batch of pull(readable, { signal: ac.signal })) {
+ chunks.push(...batch);
+ if (chunks.length >= 2) {
+ ac.abort();
+ }
+ }
+ }, { name: 'AbortError' });
+ assert.ok(chunks.length >= 2);
+}
+
+// =============================================================================
+// kValidatedSource identity - from() returns same object for validated sources
+// =============================================================================
+
+function testTrustedSourceIdentity() {
+ const readable = new Readable({ read() {} });
+ const iter = readable[toAsyncStreamable]();
+
+ // from() should return the validated iterator directly (same reference),
+ // not wrap it in another generator
+ const result = from(iter);
+ assert.strictEqual(result, iter);
+}
+
+// =============================================================================
+// Run all tests
+// =============================================================================
+
+testProtocolExists();
+testTrustedSourceByteMode();
+testTrustedSourceObjectMode();
+testTrustedSourceIdentity();
+
+Promise.all([
+ testByteModeThroughFrom(),
+ testByteModeThroughPull(),
+ testByteModeBytes(),
+ testBatchingBehavior(),
+ testDelayedPushes(),
+ testEmptyStream(),
+ testErrorPropagation(),
+ testWithTransform(),
+ testObjectModeStrings(),
+ testObjectModeUint8Array(),
+ testObjectModeMixed(),
+ testObjectModeToStreamable(),
+ testEncodedReadable(),
+ testReadableFrom(),
+ testLargeData(),
+ testEarlyTermination(),
+ testWithCompression(),
+ testObjectModeError(),
+ testDestroyMidIteration(),
+ testErrorAfterPartialData(),
+ testMultipleConsumers(),
+ testHighWaterMarkZero(),
+ testDuplexStream(),
+ testSetEncodingDynamic(),
+ testAbortSignal(),
+]).then(common.mustCall());
diff --git a/test/parallel/test-stream-iter-to-readable.js b/test/parallel/test-stream-iter-to-readable.js
new file mode 100644
index 00000000000000..4cb5600e3ba424
--- /dev/null
+++ b/test/parallel/test-stream-iter-to-readable.js
@@ -0,0 +1,620 @@
+// Flags: --experimental-stream-iter
+'use strict';
+
+// Tests for toReadable() and toReadableSync() which create byte-mode
+// Readable streams from stream/iter sources.
+
+const common = require('../common');
+const assert = require('assert');
+const { Writable } = require('stream');
+const {
+ from,
+ fromSync,
+ pull,
+ text,
+ toReadable,
+ toReadableSync,
+} = require('stream/iter');
+
+function collect(readable) {
+ return new Promise((resolve, reject) => {
+ const chunks = [];
+ readable.on('data', (chunk) => chunks.push(chunk));
+ readable.on('end', () => resolve(Buffer.concat(chunks)));
+ readable.on('error', reject);
+ });
+}
+
+// =============================================================================
+// fromStreamIter: basic async round-trip
+// =============================================================================
+
+async function testBasicAsync() {
+ const source = from('hello world');
+ const readable = toReadable(source);
+
+ assert.strictEqual(readable.readableObjectMode, false);
+
+ const result = await collect(readable);
+ assert.strictEqual(result.toString(), 'hello world');
+}
+
+// =============================================================================
+// fromStreamIter: multiple batches, all chunks arrive in order
+// =============================================================================
+
+async function testMultiBatchAsync() {
+ async function* gen() {
+ for (let i = 0; i < 5; i++) {
+ const batch = [];
+ for (let j = 0; j < 3; j++) {
+ batch.push(Buffer.from(`${i}-${j} `));
+ }
+ yield batch;
+ }
+ }
+
+ const readable = toReadable(gen());
+ const result = await collect(readable);
+ assert.strictEqual(result.toString(),
+ '0-0 0-1 0-2 1-0 1-1 1-2 2-0 2-1 2-2 3-0 3-1 3-2 4-0 4-1 4-2 ');
+}
+
+// =============================================================================
+// fromStreamIter: backpressure with small highWaterMark
+// =============================================================================
+
+async function testBackpressureAsync() {
+ let batchCount = 0;
+ async function* gen() {
+ for (let i = 0; i < 10; i++) {
+ batchCount++;
+ yield [Buffer.from(`chunk${i}`)];
+ }
+ }
+
+ const readable = toReadable(gen(), { highWaterMark: 1 });
+
+ // Read one chunk at a time with delays to exercise backpressure
+ const chunks = [];
+ for await (const chunk of readable) {
+ chunks.push(chunk);
+ }
+
+ assert.strictEqual(chunks.length, 10);
+ assert.strictEqual(batchCount, 10);
+}
+
+// =============================================================================
+// fromStreamIter: source error mid-stream
+// =============================================================================
+
+async function testErrorAsync() {
+ async function* gen() {
+ yield [Buffer.from('ok')];
+ throw new Error('source failed');
+ }
+
+ const readable = toReadable(gen());
+
+ await assert.rejects(async () => {
+ // eslint-disable-next-line no-unused-vars
+ for await (const chunk of readable) {
+ // Consume until error
+ }
+ }, { message: 'source failed' });
+}
+
+// =============================================================================
+// fromStreamIter: empty source
+// =============================================================================
+
+async function testEmptyAsync() {
+ async function* gen() {
+ // yields nothing
+ }
+
+ const readable = toReadable(gen());
+ const result = await collect(readable);
+ assert.strictEqual(result.length, 0);
+}
+
+// =============================================================================
+// fromStreamIter: empty batches are skipped
+// =============================================================================
+
+async function testEmptyBatchAsync() {
+ async function* gen() {
+ yield [];
+ yield [Buffer.from('real data')];
+ yield [];
+ }
+
+ const readable = toReadable(gen());
+ const result = await collect(readable);
+ assert.strictEqual(result.toString(), 'real data');
+}
+
+// =============================================================================
+// fromStreamIter: destroy mid-stream cleans up iterator
+// =============================================================================
+
+async function testDestroyAsync() {
+ let returnCalled = false;
+ async function* gen() {
+ try {
+ for (let i = 0; ; i++) {
+ yield [Buffer.from(`chunk${i}`)];
+ }
+ } finally {
+ returnCalled = true;
+ }
+ }
+
+ const readable = toReadable(gen());
+
+ // Read a couple chunks then destroy
+ const chunks = [];
+ await new Promise((resolve, reject) => {
+ readable.on('data', (chunk) => {
+ chunks.push(chunk);
+ if (chunks.length >= 3) {
+ readable.destroy();
+ }
+ });
+ readable.on('close', resolve);
+ readable.on('error', reject);
+ });
+
+ assert.ok(chunks.length >= 3);
+ assert.ok(returnCalled, 'iterator.return() should have been called');
+}
+
+// =============================================================================
+// fromStreamIter: destroy while pump is waiting on backpressure
+// =============================================================================
+
+async function testDestroyDuringBackpressure() {
+ let returnCalled = false;
+ async function* gen() {
+ try {
+ // Yield a large batch that will trigger backpressure with HWM=1
+ yield [Buffer.from('a'), Buffer.from('b'), Buffer.from('c')];
+ // This should never be reached
+ yield [Buffer.from('d')];
+ } finally {
+ returnCalled = true;
+ }
+ }
+
+ const readable = toReadable(gen(), { highWaterMark: 1 });
+
+ // Read one chunk to start the pump, then destroy while it's waiting
+ const chunk = await new Promise((resolve) => {
+ readable.once('readable', () => resolve(readable.read()));
+ });
+ assert.ok(chunk);
+
+ // The pump should be waiting on backpressure now. Destroy the stream.
+ readable.destroy();
+
+ await new Promise((resolve) => readable.on('close', resolve));
+ assert.ok(readable.destroyed);
+ assert.ok(returnCalled, 'iterator.return() should have been called');
+}
+
+// =============================================================================
+// fromStreamIter: large data integrity
+// =============================================================================
+
+async function testLargeDataAsync() {
+ const totalSize = 1024 * 1024; // 1 MB
+ const chunkSize = 16384;
+
+ async function* gen() {
+ let remaining = totalSize;
+ while (remaining > 0) {
+ const size = Math.min(chunkSize, remaining);
+ yield [Buffer.alloc(size, 0x42)]; // Fill with 'B'
+ remaining -= size;
+ }
+ }
+
+ const readable = toReadable(gen());
+ const result = await collect(readable);
+ assert.strictEqual(result.length, totalSize);
+ assert.strictEqual(result[0], 0x42);
+ assert.strictEqual(result[totalSize - 1], 0x42);
+}
+
+// =============================================================================
+// fromStreamIter: pipe to Writable
+// =============================================================================
+
+async function testPipeAsync() {
+ const source = from('pipe test data');
+ const readable = toReadable(source);
+
+ const chunks = [];
+ const writable = new Writable({
+ write(chunk, enc, cb) {
+ chunks.push(chunk);
+ cb();
+ },
+ });
+
+ await new Promise((resolve, reject) => {
+ readable.pipe(writable);
+ writable.on('finish', resolve);
+ writable.on('error', reject);
+ });
+
+ assert.strictEqual(Buffer.concat(chunks).toString(), 'pipe test data');
+}
+
+// =============================================================================
+// fromStreamIter: not object mode
+// =============================================================================
+
+function testNotObjectMode() {
+ async function* gen() { yield [Buffer.from('x')]; }
+ const readable = toReadable(gen());
+ assert.strictEqual(readable.readableObjectMode, false);
+}
+
+// =============================================================================
+// fromStreamIter: invalid source throws
+// =============================================================================
+
+function testInvalidSourceAsync() {
+ assert.throws(() => toReadable(42),
+ { code: 'ERR_INVALID_ARG_TYPE' });
+ assert.throws(() => toReadable('not iterable'),
+ { code: 'ERR_INVALID_ARG_TYPE' });
+ assert.throws(() => toReadable(null),
+ { code: 'ERR_INVALID_ARG_TYPE' });
+}
+
+// =============================================================================
+// fromStreamIter: signal aborts the readable
+// =============================================================================
+
+async function testSignalAsync() {
+ async function* gen() {
+ for (let i = 0; ; i++) {
+ yield [Buffer.from(`chunk${i}`)];
+ }
+ }
+
+ const ac = new AbortController();
+ const readable = toReadable(gen(), { signal: ac.signal });
+
+ const chunks = [];
+ await assert.rejects(async () => {
+ for await (const chunk of readable) {
+ chunks.push(chunk);
+ if (chunks.length >= 3) {
+ ac.abort();
+ }
+ }
+ }, { name: 'AbortError' });
+ assert.ok(chunks.length >= 3);
+ assert.ok(readable.destroyed);
+}
+
+// =============================================================================
+// fromStreamIter: signal already aborted
+// =============================================================================
+
+async function testSignalAlreadyAborted() {
+ async function* gen() {
+ yield [Buffer.from('should not reach')];
+ }
+
+ const ac = new AbortController();
+ ac.abort();
+ const readable = toReadable(gen(), { signal: ac.signal });
+
+ await assert.rejects(async () => {
+ // eslint-disable-next-line no-unused-vars
+ for await (const chunk of readable) {
+ // Should not receive any data
+ }
+ }, { name: 'AbortError' });
+ assert.ok(readable.destroyed);
+}
+
+// =============================================================================
+// fromStreamIter: options validation
+// =============================================================================
+
+function testOptionsValidationAsync() {
+ async function* gen() { yield [Buffer.from('x')]; }
+
+ // Options must be an object
+ assert.throws(() => toReadable(gen(), 'bad'),
+ { code: 'ERR_INVALID_ARG_TYPE' });
+ assert.throws(() => toReadable(gen(), 42),
+ { code: 'ERR_INVALID_ARG_TYPE' });
+
+ // highWaterMark must be a non-negative integer
+ assert.throws(() => toReadable(gen(), { highWaterMark: -1 }),
+ { code: 'ERR_OUT_OF_RANGE' });
+ assert.throws(() => toReadable(gen(), { highWaterMark: 'big' }),
+ { code: 'ERR_INVALID_ARG_TYPE' });
+ assert.throws(() => toReadable(gen(), { highWaterMark: 1.5 }),
+ { code: 'ERR_OUT_OF_RANGE' });
+
+ // Signal must be an AbortSignal
+ assert.throws(() => toReadable(gen(), { signal: 'not a signal' }),
+ { code: 'ERR_INVALID_ARG_TYPE' });
+ assert.throws(() => toReadable(gen(), { signal: 42 }),
+ { code: 'ERR_INVALID_ARG_TYPE' });
+
+ // Valid options should work
+ const r = toReadable(gen(), { highWaterMark: 0 });
+ assert.strictEqual(r.readableHighWaterMark, 0);
+ r.destroy();
+
+ // Default highWaterMark should be 16KB
+ const r2 = toReadable(gen());
+ assert.strictEqual(r2.readableHighWaterMark, 64 * 1024);
+ r2.destroy();
+
+ // Explicit undefined options should work (uses defaults)
+ const r3 = toReadable(gen(), undefined);
+ assert.strictEqual(r3.readableHighWaterMark, 64 * 1024);
+ r3.destroy();
+}
+
+// =============================================================================
+// fromStreamIter: with stream/iter transform
+// =============================================================================
+
+async function testWithTransformAsync() {
+ // Uppercase transform
+ function upper(chunks) {
+ if (chunks === null) return null;
+ return chunks.map((c) => {
+ const buf = Buffer.from(c);
+ for (let i = 0; i < buf.length; i++) {
+ if (buf[i] >= 97 && buf[i] <= 122) buf[i] -= 32;
+ }
+ return buf;
+ });
+ }
+
+ const source = pull(from('hello world'), upper);
+ const readable = toReadable(source);
+ const result = await collect(readable);
+ assert.strictEqual(result.toString(), 'HELLO WORLD');
+}
+
+// =============================================================================
+// fromStreamIterSync: basic sync round-trip
+// =============================================================================
+
+async function testBasicSync() {
+ const source = fromSync('sync hello');
+ const readable = toReadableSync(source);
+
+ assert.strictEqual(readable.readableObjectMode, false);
+
+ const result = await collect(readable);
+ assert.strictEqual(result.toString(), 'sync hello');
+}
+
+// =============================================================================
+// fromStreamIterSync: synchronous read() returns data immediately
+// =============================================================================
+
+function testSyncRead() {
+ const source = fromSync('immediate');
+ const readable = toReadableSync(source);
+
+ // Synchronous read should return data right away
+ const chunk = readable.read();
+ assert.ok(Buffer.isBuffer(chunk));
+ assert.strictEqual(chunk.toString(), 'immediate');
+}
+
+// =============================================================================
+// fromStreamIterSync: backpressure with small highWaterMark
+// =============================================================================
+
+async function testBackpressureSync() {
+ function* gen() {
+ for (let i = 0; i < 10; i++) {
+ yield [Buffer.from(`s${i}`)];
+ }
+ }
+
+ const readable = toReadableSync(gen(), { highWaterMark: 1 });
+
+ const chunks = [];
+ for await (const chunk of readable) {
+ chunks.push(chunk);
+ }
+
+ assert.strictEqual(chunks.length, 10);
+}
+
+// =============================================================================
+// fromStreamIterSync: source error
+// =============================================================================
+
+async function testErrorSync() {
+ function* gen() {
+ yield [Buffer.from('ok')];
+ throw new Error('sync source failed');
+ }
+
+ const readable = toReadableSync(gen());
+
+ await assert.rejects(async () => {
+ // eslint-disable-next-line no-unused-vars
+ for await (const chunk of readable) {
+ // Consume until error
+ }
+ }, { message: 'sync source failed' });
+}
+
+// =============================================================================
+// fromStreamIterSync: empty source
+// =============================================================================
+
+function testEmptySync() {
+ function* gen() {
+ // yields nothing
+ }
+
+ const readable = toReadableSync(gen());
+ const result = readable.read();
+ assert.strictEqual(result, null);
+}
+
+// =============================================================================
+// fromStreamIterSync: destroy calls iterator.return()
+// =============================================================================
+
+async function testDestroySync() {
+ let returnCalled = false;
+ function* gen() {
+ try {
+ for (let i = 0; ; i++) {
+ yield [Buffer.from(`chunk${i}`)];
+ }
+ } finally {
+ returnCalled = true;
+ }
+ }
+
+ const readable = toReadableSync(gen());
+ readable.read(); // Start iteration
+ readable.destroy();
+
+ await new Promise((resolve) => readable.on('close', resolve));
+ assert.ok(returnCalled, 'iterator.return() should have been called');
+}
+
+// =============================================================================
+// fromStreamIterSync: not object mode
+// =============================================================================
+
+function testNotObjectModeSync() {
+ function* gen() { yield [Buffer.from('x')]; }
+ const readable = toReadableSync(gen());
+ assert.strictEqual(readable.readableObjectMode, false);
+}
+
+// =============================================================================
+// fromStreamIterSync: invalid source throws
+// =============================================================================
+
+function testInvalidSourceSync() {
+ assert.throws(() => toReadableSync(42),
+ { code: 'ERR_INVALID_ARG_TYPE' });
+ assert.throws(() => toReadableSync(null),
+ { code: 'ERR_INVALID_ARG_TYPE' });
+}
+
+// =============================================================================
+// fromStreamIterSync: options validation
+// =============================================================================
+
+function testOptionsValidationSync() {
+ function* gen() { yield [Buffer.from('x')]; }
+
+ // Options must be an object
+ assert.throws(() => toReadableSync(gen(), 'bad'),
+ { code: 'ERR_INVALID_ARG_TYPE' });
+ assert.throws(() => toReadableSync(gen(), 42),
+ { code: 'ERR_INVALID_ARG_TYPE' });
+
+ // highWaterMark must be a non-negative integer
+ assert.throws(() => toReadableSync(gen(), { highWaterMark: -1 }),
+ { code: 'ERR_OUT_OF_RANGE' });
+ assert.throws(() => toReadableSync(gen(), { highWaterMark: 'big' }),
+ { code: 'ERR_INVALID_ARG_TYPE' });
+ assert.throws(() => toReadableSync(gen(), { highWaterMark: 1.5 }),
+ { code: 'ERR_OUT_OF_RANGE' });
+
+ // Valid options should work
+ const r = toReadableSync(gen(), { highWaterMark: 0 });
+ assert.strictEqual(r.readableHighWaterMark, 0);
+ r.destroy();
+
+ // Default highWaterMark should be 16KB
+ const r2 = toReadableSync(gen());
+ assert.strictEqual(r2.readableHighWaterMark, 64 * 1024);
+ r2.destroy();
+}
+
+// =============================================================================
+// Round-trip: stream/iter -> Readable -> stream/iter
+// =============================================================================
+
+async function testRoundTrip() {
+ const original = 'round trip data test';
+
+ // stream/iter -> classic Readable
+ const source = from(original);
+ const readable = toReadable(source);
+
+ // classic Readable -> stream/iter (via toAsyncStreamable)
+ const result = await text(from(readable));
+ assert.strictEqual(result, original);
+}
+
+// =============================================================================
+// Round-trip with compression
+// =============================================================================
+
+async function testRoundTripWithCompression() {
+ const { compressGzip, decompressGzip } = require('zlib/iter');
+
+ const original = 'compress through classic readable';
+
+ // Compress via stream/iter, bridge to classic Readable
+ const compressed = pull(from(original), compressGzip());
+ const readable = toReadable(compressed);
+
+ // Classic Readable back to stream/iter for decompression
+ const result = await text(pull(from(readable), decompressGzip()));
+ assert.strictEqual(result, original);
+}
+
+// =============================================================================
+// Run all tests
+// =============================================================================
+
+testNotObjectMode();
+testNotObjectModeSync();
+testSyncRead();
+testEmptySync();
+testInvalidSourceAsync();
+testInvalidSourceSync();
+testOptionsValidationAsync();
+testOptionsValidationSync();
+
+Promise.all([
+ testBasicAsync(),
+ testMultiBatchAsync(),
+ testBackpressureAsync(),
+ testErrorAsync(),
+ testEmptyAsync(),
+ testEmptyBatchAsync(),
+ testDestroyAsync(),
+ testDestroyDuringBackpressure(),
+ testSignalAsync(),
+ testSignalAlreadyAborted(),
+ testLargeDataAsync(),
+ testPipeAsync(),
+ testWithTransformAsync(),
+ testBasicSync(),
+ testBackpressureSync(),
+ testErrorSync(),
+ testDestroySync(),
+ testRoundTrip(),
+ testRoundTripWithCompression(),
+]).then(common.mustCall());
diff --git a/test/parallel/test-stream-iter-writable-from.js b/test/parallel/test-stream-iter-writable-from.js
new file mode 100644
index 00000000000000..fd922c5cf99537
--- /dev/null
+++ b/test/parallel/test-stream-iter-writable-from.js
@@ -0,0 +1,596 @@
+// Flags: --experimental-stream-iter
+'use strict';
+
+// Tests for toWritable() - creating a classic stream.Writable
+// backed by a stream/iter Writer.
+
+const common = require('../common');
+const assert = require('assert');
+const {
+ push,
+ text,
+ toWritable,
+} = require('stream/iter');
+
+// =============================================================================
+// Basic: write through fromStreamIter writable, read from readable
+// =============================================================================
+
+async function testBasicWrite() {
+ const { writer, readable } = push({ backpressure: 'block' });
+ const writable = toWritable(writer);
+
+ writable.write('hello');
+ writable.write(' world');
+ writable.end();
+
+ const result = await text(readable);
+ assert.strictEqual(result, 'hello world');
+}
+
+// =============================================================================
+// _write delegates to writer.write()
+// =============================================================================
+
+async function testWriteDelegatesToWriter() {
+ const chunks = [];
+ // Create a minimal Writer that records writes.
+ const writer = {
+ write(chunk) {
+ chunks.push(Buffer.from(chunk));
+ return Promise.resolve();
+ },
+ end() { return Promise.resolve(0); },
+ fail() {},
+ };
+
+ const writable = toWritable(writer);
+
+ await new Promise((resolve, reject) => {
+ writable.write('hello', (err) => {
+ if (err) reject(err);
+ else resolve();
+ });
+ });
+
+ assert.strictEqual(Buffer.concat(chunks).toString(), 'hello');
+}
+
+// =============================================================================
+// _writev delegates to writer.writev() when available
+// =============================================================================
+
+async function testWritevDelegation() {
+ const batches = [];
+ const writer = {
+ write(chunk) {
+ return Promise.resolve();
+ },
+ writev(chunks) {
+ batches.push(chunks.map((c) => Buffer.from(c)));
+ return Promise.resolve();
+ },
+ writevSync(chunks) {
+ return false;
+ },
+ end() { return Promise.resolve(0); },
+ fail() {},
+ };
+
+ const writable = toWritable(writer);
+
+ // Cork to batch writes, then uncork to trigger _writev
+ writable.cork();
+ writable.write('a');
+ writable.write('b');
+ writable.write('c');
+ writable.uncork();
+
+ await new Promise((resolve) => writable.end(resolve));
+
+ // Writev should have been called with the batched chunks
+ assert.ok(batches.length > 0, 'writev should have been called');
+}
+
+// =============================================================================
+// _writev not defined when writer lacks writev
+// =============================================================================
+
+function testNoWritevWithoutWriterWritev() {
+ const writer = {
+ write(chunk) { return Promise.resolve(); },
+ };
+
+ const writable = toWritable(writer);
+ // The _writev should be null (Writable default) when writer lacks writev
+ assert.strictEqual(writable._writev, null);
+}
+
+// =============================================================================
+// Try-sync-first: writeSync is attempted before write
+// =============================================================================
+
+async function testWriteSyncFirst() {
+ let syncCalled = false;
+ let asyncCalled = false;
+
+ const writer = {
+ writeSync(chunk) {
+ syncCalled = true;
+ return true; // Sync path accepted
+ },
+ write(chunk) {
+ asyncCalled = true;
+ return Promise.resolve();
+ },
+ end() { return Promise.resolve(0); },
+ fail() {},
+ };
+
+ const writable = toWritable(writer);
+
+ await new Promise((resolve) => {
+ writable.write('test', resolve);
+ });
+
+ assert.ok(syncCalled, 'writeSync should have been called');
+ assert.ok(!asyncCalled, 'write should not have been called');
+}
+
+// =============================================================================
+// Try-sync-first: falls back to async when writeSync returns false
+// =============================================================================
+
+async function testWriteSyncFallback() {
+ let syncCalled = false;
+ let asyncCalled = false;
+
+ const writer = {
+ writeSync(chunk) {
+ syncCalled = true;
+ return false; // Sync path rejected
+ },
+ write(chunk) {
+ asyncCalled = true;
+ return Promise.resolve();
+ },
+ end() { return Promise.resolve(0); },
+ fail() {},
+ };
+
+ const writable = toWritable(writer);
+
+ await new Promise((resolve) => {
+ writable.write('test', resolve);
+ });
+
+ assert.ok(syncCalled, 'writeSync should have been called');
+ assert.ok(asyncCalled, 'write should have been called as fallback');
+}
+
+// =============================================================================
+// Try-sync-first: endSync attempted before end
+// =============================================================================
+
+async function testEndSyncFirst() {
+ let endSyncCalled = false;
+ let endAsyncCalled = false;
+
+ const writer = {
+ write(chunk) { return Promise.resolve(); },
+ endSync() {
+ endSyncCalled = true;
+ return 5; // Success, returns byte count
+ },
+ end() {
+ endAsyncCalled = true;
+ return Promise.resolve(5);
+ },
+ fail() {},
+ };
+
+ const writable = toWritable(writer);
+
+ await new Promise((resolve) => writable.end(resolve));
+
+ assert.ok(endSyncCalled, 'endSync should have been called');
+ assert.ok(!endAsyncCalled, 'end should not have been called');
+}
+
+// =============================================================================
+// Try-sync-first: endSync returns -1, falls back to async end
+// =============================================================================
+
+async function testEndSyncFallback() {
+ let endSyncCalled = false;
+ let endAsyncCalled = false;
+
+ const writer = {
+ write(chunk) { return Promise.resolve(); },
+ endSync() {
+ endSyncCalled = true;
+ return -1; // Can't complete synchronously
+ },
+ end() {
+ endAsyncCalled = true;
+ return Promise.resolve(0);
+ },
+ fail() {},
+ };
+
+ const writable = toWritable(writer);
+
+ await new Promise((resolve) => writable.end(resolve));
+
+ assert.ok(endSyncCalled, 'endSync should have been called');
+ assert.ok(endAsyncCalled, 'end should have been called as fallback');
+}
+
+// =============================================================================
+// _final delegates to writer.end()
+// =============================================================================
+
+async function testFinalDelegatesToEnd() {
+ let endCalled = false;
+ const writer = {
+ write(chunk) { return Promise.resolve(); },
+ end() {
+ endCalled = true;
+ return Promise.resolve(0);
+ },
+ fail() {},
+ };
+
+ const writable = toWritable(writer);
+
+ await new Promise((resolve) => writable.end(resolve));
+
+ assert.ok(endCalled, 'writer.end() should have been called');
+}
+
+// =============================================================================
+// _destroy delegates to writer.fail()
+// =============================================================================
+
+async function testDestroyDelegatesToFail() {
+ let failReason = null;
+ const writer = {
+ write(chunk) { return Promise.resolve(); },
+ end() { return Promise.resolve(0); },
+ fail(reason) { failReason = reason; },
+ };
+
+ const writable = toWritable(writer);
+ writable.on('error', () => {}); // Prevent unhandled
+
+ const testErr = new Error('destroy test');
+ writable.destroy(testErr);
+
+ // Give a tick for destroy to propagate
+ await new Promise((resolve) => setTimeout(resolve, 10));
+
+ assert.strictEqual(failReason, testErr);
+}
+
+// =============================================================================
+// Error from writer.write() propagates to writable
+// =============================================================================
+
+async function testWriteErrorPropagation() {
+ const writer = {
+ write(chunk) {
+ return Promise.reject(new Error('write failed'));
+ },
+ end() { return Promise.resolve(0); },
+ fail() {},
+ };
+
+ const writable = toWritable(writer);
+
+ await assert.rejects(new Promise((resolve, reject) => {
+ writable.write('data', (err) => {
+ if (err) reject(err);
+ else resolve();
+ });
+ }), { message: 'write failed' });
+}
+
+// =============================================================================
+// Invalid writer argument throws
+// =============================================================================
+
+function testInvalidWriterThrows() {
+ assert.throws(
+ () => toWritable(null),
+ { code: 'ERR_INVALID_ARG_TYPE' },
+ );
+ assert.throws(
+ () => toWritable({}),
+ { code: 'ERR_INVALID_ARG_TYPE' },
+ );
+ assert.throws(
+ () => toWritable('not a writer'),
+ { code: 'ERR_INVALID_ARG_TYPE' },
+ );
+ // Object with write is valid (only write is required).
+ // This should not throw.
+ toWritable({
+ write() { return Promise.resolve(); },
+ });
+}
+
+// =============================================================================
+// Round-trip: push writer -> fromStreamIter -> write -> read from readable
+// =============================================================================
+
+async function testRoundTrip() {
+ const { writer, readable } = push({ backpressure: 'block' });
+ const writable = toWritable(writer);
+
+ const data = 'round trip test data';
+ writable.write(data);
+ writable.end();
+
+ const result = await text(readable);
+ assert.strictEqual(result, data);
+}
+
+// =============================================================================
+// Multiple sequential writes
+// =============================================================================
+
+async function testSequentialWrites() {
+ const { writer, readable } = push({ backpressure: 'block' });
+ const writable = toWritable(writer);
+
+ for (let i = 0; i < 10; i++) {
+ writable.write(`chunk${i}`);
+ }
+ writable.end();
+
+ let expected = '';
+ for (let i = 0; i < 10; i++) {
+ expected += `chunk${i}`;
+ }
+ const result = await text(readable);
+ assert.strictEqual(result, expected);
+}
+
+// =============================================================================
+// Sync callback is deferred via queueMicrotask
+// =============================================================================
+
+async function testSyncCallbackDeferred() {
+ let callbackTick = false;
+
+ const writer = {
+ writeSync(chunk) {
+ return true;
+ },
+ write(chunk) {
+ return Promise.resolve();
+ },
+ end() { return Promise.resolve(0); },
+ fail() {},
+ };
+
+ const writable = toWritable(writer);
+
+ const p = new Promise((resolve) => {
+ writable.write('test', () => {
+ callbackTick = true;
+ resolve();
+ });
+ // Callback should NOT have fired synchronously
+ assert.strictEqual(callbackTick, false);
+ });
+
+ await p;
+ assert.strictEqual(callbackTick, true);
+}
+
+// =============================================================================
+// Minimal writer: only write() is required
+// =============================================================================
+
+async function testMinimalWriter() {
+ const chunks = [];
+ const writer = {
+ write(chunk) {
+ chunks.push(Buffer.from(chunk));
+ return Promise.resolve();
+ },
+ // No end, fail, writeSync, writev, etc.
+ };
+
+ const writable = toWritable(writer);
+
+ await new Promise((resolve) => {
+ writable.write('minimal');
+ writable.end(resolve);
+ });
+
+ assert.strictEqual(Buffer.concat(chunks).toString(), 'minimal');
+}
+
+// =============================================================================
+// Destroy without error does not call fail()
+// =============================================================================
+
+async function testDestroyWithoutError() {
+ let failCalled = false;
+ const writer = {
+ write(chunk) { return Promise.resolve(); },
+ fail() { failCalled = true; },
+ };
+
+ const writable = toWritable(writer);
+ writable.destroy();
+
+ await new Promise((resolve) => setTimeout(resolve, 10));
+
+ assert.ok(!failCalled, 'fail should not be called on clean destroy');
+}
+
+// =============================================================================
+// Destroy with error calls fail() when available
+// =============================================================================
+
+async function testDestroyWithError() {
+ let failReason = null;
+ const writer = {
+ write(chunk) { return Promise.resolve(); },
+ fail(reason) { failReason = reason; },
+ };
+
+ const writable = toWritable(writer);
+ writable.on('error', () => {});
+
+ const err = new Error('test');
+ writable.destroy(err);
+
+ await new Promise((resolve) => setTimeout(resolve, 10));
+
+ assert.strictEqual(failReason, err);
+}
+
+// =============================================================================
+// Destroy with error when writer lacks fail()
+// =============================================================================
+
+async function testDestroyWithoutFail() {
+ const writer = {
+ write(chunk) { return Promise.resolve(); },
+ // No fail method
+ };
+
+ const writable = toWritable(writer);
+ writable.on('error', () => {});
+
+ // Should not throw even though writer has no fail()
+ writable.destroy(new Error('test'));
+
+ await new Promise((resolve) => setTimeout(resolve, 10));
+ assert.ok(writable.destroyed);
+}
+
+// =============================================================================
+// Custom highWaterMark option
+// =============================================================================
+
+function testHighWaterMarkIsMaxSafeInt() {
+ const writer = {
+ write(chunk) { return Promise.resolve(); },
+ };
+
+ // HWM is set to MAX_SAFE_INTEGER to disable Writable's internal
+ // buffering. The underlying Writer manages backpressure directly.
+ const writable = toWritable(writer);
+ assert.strictEqual(writable.writableHighWaterMark, Number.MAX_SAFE_INTEGER);
+}
+
+// =============================================================================
+// writeSync throws -- falls back to async
+// =============================================================================
+
+async function testWriteSyncThrowsFallback() {
+ let asyncCalled = false;
+
+ const writer = {
+ writeSync() {
+ throw new Error('sync broken');
+ },
+ write(chunk) {
+ asyncCalled = true;
+ return Promise.resolve();
+ },
+ end() { return Promise.resolve(0); },
+ fail() {},
+ };
+
+ const writable = toWritable(writer);
+
+ await new Promise((resolve) => {
+ writable.write('test', resolve);
+ });
+
+ assert.ok(asyncCalled, 'async write should be called as fallback');
+}
+
+// =============================================================================
+// =============================================================================
+// writer.write() throws synchronously -- error propagates to callback
+// =============================================================================
+
+async function testWriteThrowsSyncPropagation() {
+ const writer = {
+ write() {
+ throw new Error('sync throw from write');
+ },
+ };
+
+ const writable = toWritable(writer);
+
+ await assert.rejects(new Promise((resolve, reject) => {
+ writable.write('data', (err) => {
+ if (err) reject(err);
+ else resolve();
+ });
+ }), { message: 'sync throw from write' });
+}
+
+// =============================================================================
+// writer.end() throws synchronously -- error propagates to callback
+// =============================================================================
+
+async function testEndThrowsSyncPropagation() {
+ const writer = {
+ write(chunk) { return Promise.resolve(); },
+ endSync() { return -1; },
+ end() {
+ throw new Error('sync throw from end');
+ },
+ };
+
+ const writable = toWritable(writer);
+ writable.on('error', () => {});
+
+ await new Promise((resolve) => {
+ writable.end(common.mustCall((err) => {
+ assert.ok(err);
+ assert.strictEqual(err.message, 'sync throw from end');
+ resolve();
+ }));
+ });
+}
+
+// =============================================================================
+// Run all tests
+// =============================================================================
+
+testInvalidWriterThrows();
+testNoWritevWithoutWriterWritev();
+testHighWaterMarkIsMaxSafeInt();
+
+Promise.all([
+ testBasicWrite(),
+ testWriteDelegatesToWriter(),
+ testWritevDelegation(),
+ testWriteSyncFirst(),
+ testWriteSyncFallback(),
+ testWriteSyncThrowsFallback(),
+ testEndSyncFirst(),
+ testEndSyncFallback(),
+ testFinalDelegatesToEnd(),
+ testDestroyDelegatesToFail(),
+ testDestroyWithoutError(),
+ testDestroyWithError(),
+ testDestroyWithoutFail(),
+ testWriteErrorPropagation(),
+ testWriteThrowsSyncPropagation(),
+ testEndThrowsSyncPropagation(),
+ testRoundTrip(),
+ testSequentialWrites(),
+ testSyncCallbackDeferred(),
+ testMinimalWriter(),
+]).then(common.mustCall());
diff --git a/test/parallel/test-stream-iter-writable-interop.js b/test/parallel/test-stream-iter-writable-interop.js
new file mode 100644
index 00000000000000..8a2ead0d0ee579
--- /dev/null
+++ b/test/parallel/test-stream-iter-writable-interop.js
@@ -0,0 +1,667 @@
+// Flags: --experimental-stream-iter
+'use strict';
+
+// Tests for classic Writable stream interop with the stream/iter API
+// via fromWritable().
+
+const common = require('../common');
+const assert = require('assert');
+const { Writable } = require('stream');
+const {
+ from,
+ fromWritable,
+ pipeTo,
+ text,
+ ondrain,
+} = require('stream/iter');
+
+// =============================================================================
+// fromWritable() is exported from stream/iter
+// =============================================================================
+
+function testFunctionExists() {
+ assert.strictEqual(typeof fromWritable, 'function');
+}
+
+// =============================================================================
+// Default policy is strict
+// =============================================================================
+
+async function testDefaultIsStrict() {
+ const writable = new Writable({
+ highWaterMark: 1024,
+ write(chunk, encoding, cb) { cb(); },
+ });
+
+ const writer = fromWritable(writable);
+ // Should work fine when buffer has room
+ await writer.write('hello');
+ await writer.end();
+}
+
+// =============================================================================
+// Basic write: pipeTo through the adapter (block policy for pipeTo compat)
+// =============================================================================
+
+async function testBasicWrite() {
+ const chunks = [];
+ const writable = new Writable({
+ write(chunk, encoding, cb) {
+ chunks.push(Buffer.from(chunk));
+ cb();
+ },
+ });
+
+ const writer = fromWritable(writable, { backpressure: 'block' });
+ await pipeTo(from('hello world'), writer);
+
+ assert.strictEqual(Buffer.concat(chunks).toString(), 'hello world');
+}
+
+// =============================================================================
+// write() resolves when no backpressure (strict)
+// =============================================================================
+
+async function testWriteNoDrain() {
+ const chunks = [];
+ const writable = new Writable({
+ highWaterMark: 1024,
+ write(chunk, encoding, cb) {
+ chunks.push(Buffer.from(chunk));
+ cb();
+ },
+ });
+
+ const writer = fromWritable(writable);
+ await writer.write('hello');
+ await writer.write(' world');
+ await writer.end();
+
+ assert.strictEqual(Buffer.concat(chunks).toString(), 'hello world');
+}
+
+// =============================================================================
+// block: write() waits for drain when backpressure is active
+// =============================================================================
+
+async function testBlockWaitsForDrain() {
+ const chunks = [];
+ const writable = new Writable({
+ highWaterMark: 1, // Very small buffer
+ write(chunk, encoding, cb) {
+ chunks.push(Buffer.from(chunk));
+ // Delay callback to simulate slow consumer
+ setTimeout(cb, 10);
+ },
+ });
+
+ const writer = fromWritable(writable, { backpressure: 'block' });
+
+ await writer.write('a');
+ await writer.write('b');
+ await writer.write('c');
+ await writer.end();
+
+ assert.strictEqual(Buffer.concat(chunks).toString(), 'abc');
+}
+
+// =============================================================================
+// block: stream error rejects pending write
+// =============================================================================
+
+async function testBlockErrorRejectsPendingWrite() {
+ const writable = new Writable({
+ highWaterMark: 1,
+ write(chunk, enc, cb) {
+ // Never call cb -- simulate stuck write
+ },
+ });
+
+ const writer = fromWritable(writable, { backpressure: 'block' });
+
+ // First write fills the buffer, waits for drain
+ const writePromise = writer.write('data that will block');
+
+ // Destroy with error while write is pending
+ writable.destroy(new Error('stream broke'));
+
+ await assert.rejects(writePromise, { message: 'stream broke' });
+}
+
+// =============================================================================
+// strict: rejects when buffer is full
+// =============================================================================
+
+async function testStrictRejectsWhenFull() {
+ const writable = new Writable({
+ highWaterMark: 5,
+ write(chunk, enc, cb) {
+ // Never call cb -- data stays buffered
+ },
+ });
+
+ const writer = fromWritable(writable);
+
+ // First write fills the buffer (5 bytes = hwm)
+ await writer.write('12345');
+
+ // Second write should reject -- buffer is full
+ await assert.rejects(
+ writer.write('more'),
+ { code: 'ERR_INVALID_STATE' },
+ );
+}
+
+// =============================================================================
+// strict: writev rejects when buffer is full
+// =============================================================================
+
+async function testStrictWritevRejectsWhenFull() {
+ const writable = new Writable({
+ highWaterMark: 5,
+ write(chunk, enc, cb) {
+ // Never call cb
+ },
+ });
+
+ const writer = fromWritable(writable);
+
+ // Fill buffer
+ await writer.write('12345');
+
+ // Writev should reject entire batch
+ await assert.rejects(
+ writer.writev([
+ new TextEncoder().encode('a'),
+ new TextEncoder().encode('b'),
+ ]),
+ { code: 'ERR_INVALID_STATE' },
+ );
+}
+
+// =============================================================================
+// drop-newest: silently discards when buffer is full
+// =============================================================================
+
+async function testDropNewestDiscards() {
+ const chunks = [];
+ const writable = new Writable({
+ highWaterMark: 5,
+ write(chunk, enc, cb) {
+ chunks.push(Buffer.from(chunk));
+ // Never call cb -- data stays buffered
+ },
+ });
+
+ const writer = fromWritable(writable, { backpressure: 'drop-newest' });
+
+ // First write fills the buffer
+ await writer.write('12345');
+
+ // Second write should be silently discarded (no reject, no block)
+ await writer.write('dropped');
+
+ // Only the first chunk was actually written to the writable
+ assert.strictEqual(chunks.length, 1);
+ assert.strictEqual(chunks[0].toString(), '12345');
+}
+
+// =============================================================================
+// drop-newest: writev discards entire batch when full
+// =============================================================================
+
+async function testDropNewestWritevDiscards() {
+ const chunks = [];
+ const writable = new Writable({
+ highWaterMark: 5,
+ write(chunk, enc, cb) {
+ chunks.push(Buffer.from(chunk));
+ // Never call cb
+ },
+ });
+
+ const writer = fromWritable(writable, { backpressure: 'drop-newest' });
+
+ // Fill buffer
+ await writer.write('12345');
+
+ // Writev should discard entire batch
+ await writer.writev([
+ new TextEncoder().encode('a'),
+ new TextEncoder().encode('b'),
+ ]);
+
+ assert.strictEqual(chunks.length, 1);
+}
+
+// =============================================================================
+// drop-newest: still counts bytes from dropped writes
+// =============================================================================
+
+async function testDropNewestCountsBytes() {
+ const writable = new Writable({
+ highWaterMark: 5,
+ write(chunk, enc, cb) {
+ // Never call cb
+ },
+ });
+
+ const writer = fromWritable(writable, { backpressure: 'drop-newest' });
+
+ await writer.write('12345'); // 5 bytes, accepted
+ await writer.write('67890'); // 5 bytes, dropped
+
+ // desiredSize should be 0 (buffer is full)
+ assert.strictEqual(writer.desiredSize, 0);
+}
+
+// =============================================================================
+// drop-oldest: throws on construction
+// =============================================================================
+
+function testDropOldestThrows() {
+ const writable = new Writable({ write(chunk, enc, cb) { cb(); } });
+ assert.throws(
+ () => fromWritable(writable, { backpressure: 'drop-oldest' }),
+ { code: 'ERR_INVALID_ARG_VALUE' },
+ );
+}
+
+// =============================================================================
+// Invalid backpressure value throws
+// =============================================================================
+
+function testInvalidBackpressureThrows() {
+ const writable = new Writable({ write(chunk, enc, cb) { cb(); } });
+ assert.throws(
+ () => fromWritable(writable, { backpressure: 'invalid' }),
+ { code: 'ERR_INVALID_ARG_VALUE' },
+ );
+}
+
+// =============================================================================
+// writev() corks and uncorks (block policy)
+// =============================================================================
+
+async function testWritev() {
+ const chunks = [];
+ const writable = new Writable({
+ highWaterMark: 1024,
+ write(chunk, encoding, cb) {
+ chunks.push(Buffer.from(chunk));
+ cb();
+ },
+ writev(entries, cb) {
+ for (const { chunk } of entries) {
+ chunks.push(Buffer.from(chunk));
+ }
+ cb();
+ },
+ });
+
+ const writer = fromWritable(writable, { backpressure: 'block' });
+ await writer.writev([
+ new TextEncoder().encode('hello'),
+ new TextEncoder().encode(' '),
+ new TextEncoder().encode('world'),
+ ]);
+ await writer.end();
+
+ assert.strictEqual(Buffer.concat(chunks).toString(), 'hello world');
+}
+
+// =============================================================================
+// writeSync / writevSync always return false
+// =============================================================================
+
+function testSyncMethodsReturnFalse() {
+ const writable = new Writable({ write(chunk, enc, cb) { cb(); } });
+ const writer = fromWritable(writable);
+
+ assert.strictEqual(writer.writeSync(new Uint8Array(1)), false);
+ assert.strictEqual(writer.writevSync([new Uint8Array(1)]), false);
+}
+
+// =============================================================================
+// endSync returns -1
+// =============================================================================
+
+function testEndSyncReturnsNegativeOne() {
+ const writable = new Writable({ write(chunk, enc, cb) { cb(); } });
+ const writer = fromWritable(writable);
+
+ assert.strictEqual(writer.endSync(), -1);
+}
+
+// =============================================================================
+// end() resolves with total bytes written
+// =============================================================================
+
+async function testEndReturnsByteCount() {
+ const writable = new Writable({
+ write(chunk, encoding, cb) { cb(); },
+ });
+
+ const writer = fromWritable(writable);
+ await writer.write('hello'); // 5 bytes
+ await writer.write(' world'); // 6 bytes
+ const total = await writer.end();
+
+ assert.strictEqual(total, 11);
+}
+
+// =============================================================================
+// fail() destroys the writable
+// =============================================================================
+
+async function testFail() {
+ const writable = new Writable({ write(chunk, enc, cb) { cb(); } });
+ writable.on('error', () => {}); // Prevent unhandled error
+ const writer = fromWritable(writable);
+
+ writer.fail(new Error('test fail'));
+
+ assert.ok(writable.destroyed);
+}
+
+// =============================================================================
+// desiredSize reflects buffer state
+// =============================================================================
+
+function testDesiredSize() {
+ const writable = new Writable({
+ highWaterMark: 100,
+ write(chunk, enc, cb) {
+ // Don't call cb - keeps data buffered
+ },
+ });
+
+ const writer = fromWritable(writable);
+ assert.strictEqual(writer.desiredSize, 100);
+}
+
+// =============================================================================
+// desiredSize is null when destroyed
+// =============================================================================
+
+function testDesiredSizeNull() {
+ const writable = new Writable({ write(chunk, enc, cb) { cb(); } });
+ const writer = fromWritable(writable);
+
+ writable.destroy();
+ assert.strictEqual(writer.desiredSize, null);
+}
+
+// =============================================================================
+// drainableProtocol: resolves immediately when no backpressure
+// =============================================================================
+
+async function testDrainableNoPressure() {
+ const writable = new Writable({
+ highWaterMark: 1024,
+ write(chunk, enc, cb) { cb(); },
+ });
+
+ const writer = fromWritable(writable);
+ const result = await ondrain(writer);
+ assert.strictEqual(result, true);
+}
+
+// =============================================================================
+// drainableProtocol: returns null when destroyed
+// =============================================================================
+
+function testDrainableNull() {
+ const writable = new Writable({ write(chunk, enc, cb) { cb(); } });
+ const writer = fromWritable(writable);
+
+ writable.destroy();
+ assert.strictEqual(ondrain(writer), null);
+}
+
+// =============================================================================
+// Error propagation: write after end rejects
+// =============================================================================
+
+async function testWriteAfterEnd() {
+ const writable = new Writable({ write(chunk, enc, cb) { cb(); } });
+ const writer = fromWritable(writable);
+
+ await writer.end();
+
+ await assert.rejects(
+ writer.write('should fail'),
+ { code: 'ERR_STREAM_WRITE_AFTER_END' },
+ );
+}
+
+// =============================================================================
+// Multiple sequential writes
+// =============================================================================
+
+async function testSequentialWrites() {
+ const chunks = [];
+ const writable = new Writable({
+ write(chunk, encoding, cb) {
+ chunks.push(Buffer.from(chunk));
+ cb();
+ },
+ });
+
+ const writer = fromWritable(writable);
+
+ for (let i = 0; i < 10; i++) {
+ await writer.write(`chunk${i}`);
+ }
+ await writer.end();
+
+ let expected = '';
+ for (let i = 0; i < 10; i++) {
+ expected += `chunk${i}`;
+ }
+ assert.strictEqual(Buffer.concat(chunks).toString(), expected);
+}
+
+// =============================================================================
+// pipeTo with compression transform into writable (block policy)
+// =============================================================================
+
+async function testPipeToWithTransform() {
+ const {
+ compressGzip,
+ decompressGzip,
+ } = require('zlib/iter');
+ const { pull } = require('stream/iter');
+
+ const compressed = [];
+ const writable = new Writable({
+ write(chunk, encoding, cb) {
+ compressed.push(Buffer.from(chunk));
+ cb();
+ },
+ });
+
+ const writer = fromWritable(writable, { backpressure: 'block' });
+ await pipeTo(from('hello via transform'), compressGzip(), writer);
+
+ const decompressed = await text(
+ pull(from(Buffer.concat(compressed)), decompressGzip()),
+ );
+ assert.strictEqual(decompressed, 'hello via transform');
+}
+
+// =============================================================================
+// Dispose support
+// =============================================================================
+
+async function testDispose() {
+ const writable = new Writable({ write(chunk, enc, cb) { cb(); } });
+ const writer = fromWritable(writable);
+
+ writer[Symbol.dispose]();
+ assert.ok(writable.destroyed);
+}
+
+async function testAsyncDispose() {
+ const writable = new Writable({ write(chunk, enc, cb) { cb(); } });
+ const writer = fromWritable(writable);
+
+ await writer[Symbol.asyncDispose]();
+ assert.ok(writable.destroyed);
+}
+
+// =============================================================================
+// write() validates chunk type
+// =============================================================================
+
+async function testWriteInvalidChunkType() {
+ const writable = new Writable({ write(chunk, enc, cb) { cb(); } });
+ const writer = fromWritable(writable);
+
+ await assert.rejects(
+ writer.write(42),
+ { code: 'ERR_INVALID_ARG_TYPE' },
+ );
+ await assert.rejects(
+ writer.write(null),
+ { code: 'ERR_INVALID_ARG_TYPE' },
+ );
+ await assert.rejects(
+ writer.write({}),
+ { code: 'ERR_INVALID_ARG_TYPE' },
+ );
+}
+
+// =============================================================================
+// writev() validates chunks is an array
+// =============================================================================
+
+function testWritevInvalidChunksType() {
+ const writable = new Writable({ write(chunk, enc, cb) { cb(); } });
+ const writer = fromWritable(writable);
+
+ assert.throws(
+ () => writer.writev('not an array'),
+ { code: 'ERR_INVALID_ARG_TYPE' },
+ );
+ assert.throws(
+ () => writer.writev(42),
+ { code: 'ERR_INVALID_ARG_TYPE' },
+ );
+}
+
+// =============================================================================
+// Cached writer: second call returns same instance
+// =============================================================================
+
+function testCachedWriter() {
+ const writable = new Writable({ write(chunk, enc, cb) { cb(); } });
+ const writer1 = fromWritable(writable);
+ const writer2 = fromWritable(writable);
+
+ assert.strictEqual(writer1, writer2);
+}
+
+// =============================================================================
+// fail() rejects pending block waiters
+// =============================================================================
+
+async function testFailRejectsPendingWaiters() {
+ const writable = new Writable({
+ highWaterMark: 1,
+ write(chunk, enc, cb) {
+ // Never call cb -- stuck
+ },
+ });
+ writable.on('error', () => {}); // Prevent unhandled error
+
+ const writer = fromWritable(writable, { backpressure: 'block' });
+
+ // This write will block on drain
+ const writePromise = writer.write('blocked data');
+
+ // fail() should reject the pending waiter, not orphan it
+ writer.fail(new Error('fail reason'));
+
+ await assert.rejects(writePromise, { message: 'fail reason' });
+}
+
+// =============================================================================
+// dispose rejects pending block waiters
+// =============================================================================
+
+async function testDisposeRejectsPendingWaiters() {
+ const writable = new Writable({
+ highWaterMark: 1,
+ write(chunk, enc, cb) {
+ // Never call cb -- stuck
+ },
+ });
+
+ const writer = fromWritable(writable, { backpressure: 'block' });
+
+ // This write will block on drain
+ const writePromise = writer.write('blocked data');
+
+ writer[Symbol.dispose]();
+
+ await assert.rejects(writePromise, { name: 'AbortError' });
+}
+
+// =============================================================================
+// Run all tests
+// =============================================================================
+
+testFunctionExists();
+testSyncMethodsReturnFalse();
+// =============================================================================
+// Object-mode Writable throws
+// =============================================================================
+
+function testObjectModeThrows() {
+ const writable = new Writable({
+ objectMode: true,
+ write(chunk, enc, cb) { cb(); },
+ });
+ assert.throws(
+ () => fromWritable(writable),
+ { code: 'ERR_INVALID_STATE' },
+ );
+}
+
+testFunctionExists();
+testSyncMethodsReturnFalse();
+testEndSyncReturnsNegativeOne();
+testDesiredSize();
+testDesiredSizeNull();
+testDrainableNull();
+testDropOldestThrows();
+testInvalidBackpressureThrows();
+testWritevInvalidChunksType();
+testCachedWriter();
+testObjectModeThrows();
+
+Promise.all([
+ testDefaultIsStrict(),
+ testBasicWrite(),
+ testWriteNoDrain(),
+ testBlockWaitsForDrain(),
+ testBlockErrorRejectsPendingWrite(),
+ testStrictRejectsWhenFull(),
+ testStrictWritevRejectsWhenFull(),
+ testDropNewestDiscards(),
+ testDropNewestWritevDiscards(),
+ testDropNewestCountsBytes(),
+ testWritev(),
+ testEndReturnsByteCount(),
+ testFail(),
+ testDrainableNoPressure(),
+ testWriteAfterEnd(),
+ testSequentialWrites(),
+ testPipeToWithTransform(),
+ testDispose(),
+ testAsyncDispose(),
+ testWriteInvalidChunkType(),
+ testFailRejectsPendingWaiters(),
+ testDisposeRejectsPendingWaiters(),
+]).then(common.mustCall());