diff --git a/doc/api/errors.md b/doc/api/errors.md index 65ef2ce7bf5d01..98073d49d62098 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -2882,6 +2882,13 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream. A stream method was called that cannot complete because the stream was destroyed using `stream.destroy()`. + + +### `ERR_STREAM_ITER_MISSING_FLAG` + +A stream/iter API was used without the `--experimental-stream-iter` CLI flag +enabled. + ### `ERR_STREAM_NULL_VALUES` diff --git a/doc/api/stream.md b/doc/api/stream.md index 65afeaad6306e0..20892fb6cb0a87 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1998,6 +1998,61 @@ option. In the code example above, data will be in a single chunk if the file has less then 64 KiB of data because no `highWaterMark` option is provided to [`fs.createReadStream()`][]. +##### `readable[Symbol.for('Stream.toAsyncStreamable')]()` + + + +> Stability: 1 - Experimental + +* Returns: {AsyncIterable} An `AsyncIterable` that yields + batched chunks from the stream. + +When the `--experimental-stream-iter` flag is enabled, `Readable` streams +implement the [`Stream.toAsyncStreamable`][] protocol, enabling efficient +consumption by the [`stream/iter`][] API. + +This provides a batched async iterator that drains the stream's internal +buffer into `Uint8Array[]` batches, amortizing the per-chunk Promise overhead +of the standard `Symbol.asyncIterator` path. For byte-mode streams, chunks +are yielded directly as `Buffer` instances (which are `Uint8Array` subclasses). +For object-mode or encoded streams, each chunk is normalized to `Uint8Array` +before batching. + +The returned iterator is tagged as a validated source, so [`from()`][stream-iter-from] +passes it through without additional normalization. + +```mjs +import { Readable } from 'node:stream'; +import { text, from } from 'node:stream/iter'; + +const readable = new Readable({ + read() { this.push('hello'); this.push(null); }, +}); + +// Readable is automatically consumed via toAsyncStreamable +console.log(await text(from(readable))); // 'hello' +``` + +```cjs +const { Readable } = require('node:stream'); +const { text, from } = require('node:stream/iter'); + +async function run() { + const readable = new Readable({ + read() { this.push('hello'); this.push(null); }, + }); + + console.log(await text(from(readable))); // 'hello' +} + +run().catch(console.error); +``` + +Without the `--experimental-stream-iter` flag, calling this method throws +[`ERR_STREAM_ITER_MISSING_FLAG`][]. + ##### `readable[Symbol.asyncDispose]()` + +> Stability: 1 - Experimental + +* `readable` {stream.Readable|Object} A classic Readable stream or any object + with `read()` and `on()` methods. +* Returns: {AsyncIterable\} A stream/iter async iterable source. + +Converts a classic Readable stream (or duck-typed equivalent) into a +stream/iter async iterable source that can be passed to [`from()`][], +[`pull()`][], [`text()`][], etc. + +If the object implements the [`toAsyncStreamable`][] protocol (as +`stream.Readable` does), that protocol is used. Otherwise, the function +duck-types on `read()` and `on()` (EventEmitter) and wraps the stream with +a batched async iterator. + +The result is cached per instance -- calling `fromReadable()` twice with the +same stream returns the same iterable. + +For object-mode or encoded Readable streams, chunks are automatically +normalized to `Uint8Array`. + +```mjs +import { Readable } from 'node:stream'; +import { fromReadable, text } from 'node:stream/iter'; + +const readable = new Readable({ + read() { this.push('hello world'); this.push(null); }, +}); + +const result = await text(fromReadable(readable)); +console.log(result); // 'hello world' +``` + +```cjs +const { Readable } = require('node:stream'); +const { fromReadable, text } = require('node:stream/iter'); + +const readable = new Readable({ + read() { this.push('hello world'); this.push(null); }, +}); + +async function run() { + const result = await text(fromReadable(readable)); + console.log(result); // 'hello world' +} +run(); +``` + +### `fromWritable(writable[, options])` + + + +> Stability: 1 - Experimental + +* `writable` {stream.Writable|Object} A classic Writable stream or any object + with `write()` and `on()` methods. +* `options` {Object} + * `backpressure` {string} Backpressure policy. **Default:** `'strict'`. + * `'strict'` -- writes are rejected when the buffer is full. Catches + callers that ignore backpressure. + * `'block'` -- writes wait for drain when the buffer is full. Recommended + for use with [`pipeTo()`][]. + * `'drop-newest'` -- writes are silently discarded when the buffer is full. + * `'drop-oldest'` -- **not supported**. Throws `ERR_INVALID_ARG_VALUE`. +* Returns: {Object} A stream/iter Writer adapter. + +Creates a stream/iter Writer adapter from a classic Writable stream (or +duck-typed equivalent). The adapter can be passed to [`pipeTo()`][] as a +destination. + +Since all writes on a classic Writable are fundamentally asynchronous, +the synchronous Writer methods (`writeSync`, `writevSync`, `endSync`) always +return `false` or `-1`, deferring to the async path. The per-write +`options.signal` parameter from the Writer interface is also ignored. + +The result is cached per instance -- calling `fromWritable()` twice with the +same stream returns the same Writer. + +For duck-typed streams that do not expose `writableHighWaterMark`, +`writableLength`, or similar properties, sensible defaults are used. +Object-mode writables (if detectable) are rejected since the Writer +interface is bytes-only. + +```mjs +import { Writable } from 'node:stream'; +import { from, fromWritable, pipeTo } from 'node:stream/iter'; + +const writable = new Writable({ + write(chunk, encoding, cb) { console.log(chunk.toString()); cb(); }, +}); + +await pipeTo(from('hello world'), + fromWritable(writable, { backpressure: 'block' })); +``` + +```cjs +const { Writable } = require('node:stream'); +const { from, fromWritable, pipeTo } = require('node:stream/iter'); + +async function run() { + const writable = new Writable({ + write(chunk, encoding, cb) { console.log(chunk.toString()); cb(); }, + }); + + await pipeTo(from('hello world'), + fromWritable(writable, { backpressure: 'block' })); +} +run(); +``` + +### `toReadable(source[, options])` + + + +> Stability: 1 - Experimental + +* `source` {AsyncIterable} An `AsyncIterable` source, such as + the return value of [`pull()`][] or [`from()`][]. +* `options` {Object} + * `highWaterMark` {number} The internal buffer size in bytes before + backpressure is applied. **Default:** `65536` (64 KB). + * `signal` {AbortSignal} An optional signal to abort the readable. +* Returns: {stream.Readable} + +Creates a byte-mode [`stream.Readable`][] from an `AsyncIterable` +(the native batch format used by the stream/iter API). Each `Uint8Array` in a +yielded batch is pushed as a separate chunk into the Readable. + +```mjs +import { createWriteStream } from 'node:fs'; +import { from, pull, toReadable } from 'node:stream/iter'; +import { compressGzip } from 'node:zlib/iter'; + +const source = pull(from('hello world'), compressGzip()); +const readable = toReadable(source); + +readable.pipe(createWriteStream('output.gz')); +``` + +```cjs +const { createWriteStream } = require('node:fs'); +const { from, pull, toReadable } = require('node:stream/iter'); +const { compressGzip } = require('node:zlib/iter'); + +const source = pull(from('hello world'), compressGzip()); +const readable = toReadable(source); + +readable.pipe(createWriteStream('output.gz')); +``` + +### `toReadableSync(source[, options])` + + + +> Stability: 1 - Experimental + +* `source` {Iterable} An `Iterable` source, such as the + return value of [`pullSync()`][] or [`fromSync()`][]. +* `options` {Object} + * `highWaterMark` {number} The internal buffer size in bytes before + backpressure is applied. **Default:** `65536` (64 KB). +* Returns: {stream.Readable} + +Creates a byte-mode [`stream.Readable`][] from a synchronous +`Iterable`. The `_read()` method pulls from the iterator +synchronously, so data is available immediately via `readable.read()`. + +```mjs +import { fromSync, toReadableSync } from 'node:stream/iter'; + +const source = fromSync('hello world'); +const readable = toReadableSync(source); + +console.log(readable.read().toString()); // 'hello world' +``` + +```cjs +const { fromSync, toReadableSync } = require('node:stream/iter'); + +const source = fromSync('hello world'); +const readable = toReadableSync(source); + +console.log(readable.read().toString()); // 'hello world' +``` + +### `toWritable(writer)` + + + +> Stability: 1 - Experimental + +* `writer` {Object} A stream/iter Writer. Only the `write()` method is + required; `end()`, `fail()`, `writeSync()`, `writevSync()`, `endSync()`, + and `writev()` are optional. +* Returns: {stream.Writable} + +Creates a classic [`stream.Writable`][] backed by a stream/iter Writer. + +Each `_write()` / `_writev()` call attempts the Writer's synchronous method +first (`writeSync` / `writevSync`), falling back to the async method if the +sync path returns `false` or throws. Similarly, `_final()` tries `endSync()` +before `end()`. When the sync path succeeds, the callback is deferred via +`queueMicrotask` to preserve the async resolution contract. + +The Writable's `highWaterMark` is set to `Number.MAX_SAFE_INTEGER` to +effectively disable its internal buffering, allowing the underlying Writer +to manage backpressure directly. + +```mjs +import { push, toWritable } from 'node:stream/iter'; + +const { writer, readable } = push(); +const writable = toWritable(writer); + +writable.write('hello'); +writable.end(); +``` + +```cjs +const { push, toWritable } = require('node:stream/iter'); + +const { writer, readable } = push(); +const writable = toWritable(writer); + +writable.write('hello'); +writable.end(); +``` + ## Protocol symbols These well-known symbols allow third-party objects to participate in the @@ -1816,10 +2068,15 @@ console.log(textSync(stream)); // 'hello world' [`arrayBuffer()`]: #arraybuffersource-options [`bytes()`]: #bytessource-options [`from()`]: #frominput +[`fromSync()`]: #fromsyncinput [`node:zlib/iter`]: zlib_iter.md [`node:zlib/iter` documentation]: zlib_iter.md [`pipeTo()`]: #pipetosource-transforms-writer-options [`pull()`]: #pullsource-transforms-options +[`pullSync()`]: #pullsyncsource-transforms-options [`share()`]: #sharesource-options +[`stream.Readable`]: stream.md#class-streamreadable +[`stream.Writable`]: stream.md#class-streamwritable [`tap()`]: #tapcallback [`text()`]: #textsource-options +[`toAsyncStreamable`]: #streamtoasyncstreamable diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 7c4728627731fe..206e2a24716022 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -1770,6 +1770,8 @@ E('ERR_STREAM_ALREADY_FINISHED', Error); E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error); E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error); +E('ERR_STREAM_ITER_MISSING_FLAG', + 'The stream/iter API requires the --experimental-stream-iter flag', TypeError); E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError); E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error); E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error); diff --git a/lib/internal/streams/iter/classic.js b/lib/internal/streams/iter/classic.js new file mode 100644 index 00000000000000..17aa43f0969202 --- /dev/null +++ b/lib/internal/streams/iter/classic.js @@ -0,0 +1,844 @@ +'use strict'; + +// Interop utilities between classic Node.js streams and the stream/iter API. +// +// These are Node.js-specific (not part of the stream/iter spec) and are +// exported from 'stream/iter' as top-level utility functions: +// +// fromReadable(readable) -- classic Readable (or duck-type) -> stream/iter source +// fromWritable(writable, opts) -- classic Writable (or duck-type) -> stream/iter Writer +// toReadable(source, opts) -- stream/iter source -> classic Readable +// toReadableSync(source, opts) -- stream/iter source (sync) -> classic Readable +// toWritable(writer) -- stream/iter Writer -> classic Writable + +const { + ArrayIsArray, + MathMax, + NumberMAX_SAFE_INTEGER, + Promise, + PromisePrototypeThen, + PromiseReject, + PromiseResolve, + PromiseWithResolvers, + SafeWeakMap, + SymbolAsyncDispose, + SymbolAsyncIterator, + SymbolDispose, + SymbolIterator, + TypedArrayPrototypeGetByteLength, +} = primordials; + +const { + AbortError, + aggregateTwoErrors, + codes: { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_ARG_VALUE, + ERR_INVALID_STATE, + ERR_STREAM_WRITE_AFTER_END, + }, +} = require('internal/errors'); + +const { + validateInteger, + validateObject, +} = require('internal/validators'); + +const { eos } = require('internal/streams/end-of-stream'); +const { + addAbortSignal: addAbortSignalNoValidate, +} = require('internal/streams/add-abort-signal'); +const { + queueMicrotask, +} = require('internal/process/task_queues'); + +const { + toAsyncStreamable: kToAsyncStreamable, + kValidatedSource, + drainableProtocol, +} = require('internal/streams/iter/types'); + +const { + validateBackpressure, + toUint8Array, +} = require('internal/streams/iter/utils'); + +const { Buffer } = require('buffer'); +const destroyImpl = require('internal/streams/destroy'); + +// Lazy-loaded to avoid circular dependencies. Readable and Writable +// both require this module's parent, so we defer the require. +let Readable; +let Writable; + +function lazyReadable() { + if (Readable === undefined) { + Readable = require('internal/streams/readable'); + } + return Readable; +} + +function lazyWritable() { + if (Writable === undefined) { + Writable = require('internal/streams/writable'); + } + return Writable; +} + +// ============================================================================ +// fromReadable(readable) -- classic Readable -> stream/iter async iterable +// ============================================================================ + +// Cache: one stream/iter source per Readable instance. +const fromReadableCache = new SafeWeakMap(); + +// Maximum chunks to drain into a single batch. Bounds peak memory when +// _read() synchronously pushes many chunks into the buffer. +const MAX_DRAIN_BATCH = 128; + +const { normalizeAsyncValue } = require('internal/streams/iter/from'); +const { isUint8Array } = require('internal/util/types'); + +// Normalize a batch of raw chunks from an object-mode or encoded +// Readable into Uint8Array values. Returns the normalized batch, +// or null if normalization produced no output. +async function normalizeBatch(raw) { + const batch = []; + for (let i = 0; i < raw.length; i++) { + const value = raw[i]; + if (isUint8Array(value)) { + batch.push(value); + } else { + // normalizeAsyncValue may await for async protocols (e.g. + // toAsyncStreamable on yielded objects). Stream events during + // the suspension are queued, not lost -- errors will surface + // on the next loop iteration after this yield completes. + for await (const normalized of normalizeAsyncValue(value)) { + batch.push(normalized); + } + } + } + return batch.length > 0 ? batch : null; +} + +// Batched async iterator for Readable streams. Same mechanism as +// createAsyncIterator (same event setup, same stream.read() to +// trigger _read(), same teardown) but drains all currently buffered +// chunks into a single Uint8Array[] batch per yield, amortizing the +// Promise/microtask cost across multiple chunks. +// +// When normalize is provided (object-mode / encoded streams), each +// drained batch is passed through it to convert chunks to Uint8Array. +// When normalize is null (byte-mode), chunks are already Buffers +// (Uint8Array subclass) and are yielded directly. +const nop = () => {}; + +async function* createBatchedAsyncIterator(stream, normalize) { + let callback = nop; + + function next(resolve) { + if (this === stream) { + callback(); + callback = nop; + } else { + callback = resolve; + } + } + + stream.on('readable', next); + + let error; + const cleanup = eos(stream, { writable: false }, (err) => { + error = err ? aggregateTwoErrors(error, err) : null; + callback(); + callback = nop; + }); + + try { + while (true) { + const chunk = stream.destroyed ? null : stream.read(); + if (chunk !== null) { + const batch = [chunk]; + while (batch.length < MAX_DRAIN_BATCH && + stream._readableState?.length > 0) { + const c = stream.read(); + if (c === null) break; + batch.push(c); + } + if (normalize !== null) { + const result = await normalize(batch); + if (result !== null) { + yield result; + } + } else { + yield batch; + } + } else if (error) { + throw error; + } else if (error === null) { + return; + } else { + await new Promise(next); + } + } + } catch (err) { + error = aggregateTwoErrors(error, err); + throw error; + } finally { + if (error === undefined || + (stream._readableState?.autoDestroy)) { + destroyImpl.destroyer(stream, null); + } else { + stream.off('readable', next); + cleanup(); + } + } +} + +/** + * Convert a classic Readable (or duck-type) to a stream/iter async iterable. + * + * If the object implements the toAsyncStreamable protocol, delegates to it. + * Otherwise, duck-type checks for read() + EventEmitter (on/off) and + * wraps with a batched async iterator. + * @param {object} readable - A classic Readable or duck-type with + * read() and on()/off() methods. + * @returns {AsyncIterable} A stream/iter async iterable source. + */ +function fromReadable(readable) { + if (readable == null || typeof readable !== 'object') { + throw new ERR_INVALID_ARG_TYPE('readable', 'Readable', readable); + } + + // Check cache first. + const cached = fromReadableCache.get(readable); + if (cached !== undefined) return cached; + + // Protocol path: object implements toAsyncStreamable. + if (typeof readable[kToAsyncStreamable] === 'function') { + const result = readable[kToAsyncStreamable](); + fromReadableCache.set(readable, result); + return result; + } + + // Duck-type path: object has read() and EventEmitter methods. + if (typeof readable.read !== 'function' || + typeof readable.on !== 'function') { + throw new ERR_INVALID_ARG_TYPE('readable', 'Readable', readable); + } + + // Determine normalization. If the stream has _readableState, use it + // to detect object-mode / encoding. Otherwise assume byte-mode. + const state = readable._readableState; + const normalize = (state && (state.objectMode || state.encoding)) ? + normalizeBatch : null; + + const iter = createBatchedAsyncIterator(readable, normalize); + iter[kValidatedSource] = true; + iter.stream = readable; + + fromReadableCache.set(readable, iter); + return iter; +} + + +// ============================================================================ +// toReadable(source, options) -- stream/iter source -> classic Readable +// ============================================================================ + +const kNullPrototype = { __proto__: null }; + +/** + * Create a byte-mode Readable from an AsyncIterable. + * The source must yield Uint8Array[] batches (the stream/iter native + * format). Each Uint8Array in a batch is pushed as a separate chunk. + * @param {AsyncIterable} source + * @param {object} [options] + * @param {number} [options.highWaterMark] + * @param {AbortSignal} [options.signal] + * @returns {stream.Readable} + */ +function toReadable(source, options = kNullPrototype) { + if (typeof source?.[SymbolAsyncIterator] !== 'function') { + throw new ERR_INVALID_ARG_TYPE('source', 'AsyncIterable', source); + } + + validateObject(options, 'options'); + const { + highWaterMark = 64 * 1024, + signal, + } = options; + validateInteger(highWaterMark, 'options.highWaterMark', 0); + + const ReadableCtor = lazyReadable(); + const iterator = source[SymbolAsyncIterator](); + let backpressure; + let pumping = false; + let done = false; + + const readable = new ReadableCtor({ + __proto__: null, + highWaterMark, + read() { + if (backpressure) { + const { resolve } = backpressure; + backpressure = null; + resolve(); + } else if (!pumping && !done) { + pumping = true; + pump(); + } + }, + destroy(err, cb) { + done = true; + // Wake up the pump if it's waiting on backpressure so it + // can see done === true and exit cleanly. + if (backpressure) { + backpressure.resolve(); + backpressure = null; + } + if (typeof iterator.return === 'function') { + PromisePrototypeThen(iterator.return(), + () => cb(err), (e) => cb(e || err)); + } else { + cb(err); + } + }, + }); + + if (signal) { + addAbortSignalNoValidate(signal, readable); + } + + async function pump() { + try { + while (!done) { + const { value: batch, done: iterDone } = await iterator.next(); + if (iterDone) { + done = true; + readable.push(null); + return; + } + for (let i = 0; i < batch.length; i++) { + if (!readable.push(batch[i])) { + backpressure = PromiseWithResolvers(); + await backpressure.promise; + if (done) return; + } + } + } + } catch (err) { + done = true; + readable.destroy(err); + } + } + + return readable; +} + + +// ============================================================================ +// toReadableSync(source, options) -- stream/iter source (sync) -> Readable +// ============================================================================ + +/** + * Create a byte-mode Readable from an Iterable. + * Fully synchronous -- _read() pulls from the iterator directly. + * @param {Iterable} source + * @param {object} [options] + * @param {number} [options.highWaterMark] + * @returns {stream.Readable} + */ +function toReadableSync(source, options = kNullPrototype) { + if (typeof source?.[SymbolIterator] !== 'function') { + throw new ERR_INVALID_ARG_TYPE('source', 'Iterable', source); + } + + validateObject(options, 'options'); + const { + highWaterMark = 64 * 1024, + } = options; + validateInteger(highWaterMark, 'options.highWaterMark', 0); + + const ReadableCtor = lazyReadable(); + const iterator = source[SymbolIterator](); + + return new ReadableCtor({ + __proto__: null, + highWaterMark, + read() { + for (;;) { + const { value: batch, done } = iterator.next(); + if (done) { + this.push(null); + return; + } + for (let i = 0; i < batch.length; i++) { + if (!this.push(batch[i])) return; + } + } + }, + destroy(err, cb) { + if (typeof iterator.return === 'function') iterator.return(); + cb(err); + }, + }); +} + + +// ============================================================================ +// fromWritable(writable, options) -- classic Writable -> stream/iter Writer +// ============================================================================ + +// Cache: one Writer adapter per Writable instance. +const fromWritableCache = new SafeWeakMap(); + +/** + * Create a stream/iter Writer adapter from a classic Writable (or duck-type). + * + * Duck-type requirements: write() and on()/off() methods. + * Falls back to sensible defaults for missing properties like + * writableHighWaterMark, writableLength, writableObjectMode. + * @param {object} writable - A classic Writable or duck-type. + * @param {object} [options] + * @param {string} [options.backpressure] - 'strict', 'block', + * 'drop-newest'. 'drop-oldest' is not supported. + * @returns {object} A stream/iter Writer adapter. + */ +function fromWritable(writable, options = kNullPrototype) { + if (writable == null || + typeof writable.write !== 'function' || + typeof writable.on !== 'function') { + throw new ERR_INVALID_ARG_TYPE('writable', 'Writable', writable); + } + + // Return cached adapter if available. + const cached = fromWritableCache.get(writable); + if (cached !== undefined) return cached; + + validateObject(options, 'options'); + const { + backpressure = 'strict', + } = options; + validateBackpressure(backpressure); + + // The Writer interface is bytes-only. Object-mode Writables expect + // arbitrary JS values, which is incompatible. + if (writable.writableObjectMode) { + throw new ERR_INVALID_STATE( + 'Cannot create a stream/iter Writer from an object-mode Writable'); + } + + // drop-oldest is not supported for classic stream.Writable. The + // Writable's internal buffer stores individual { chunk, encoding, + // callback } entries with no concept of batch boundaries. A writev() + // call fans out into N separate buffer entries, so a subsequent + // drop-oldest eviction could partially tear apart an earlier atomic + // writev batch. The PushWriter avoids this because writev occupies a + // single slot. Supporting drop-oldest here would require either + // accepting partial writev eviction or adding batch tracking to the + // buffer -- neither is acceptable without a deeper rework of Writable + // internals. + if (backpressure === 'drop-oldest') { + throw new ERR_INVALID_ARG_VALUE('options.backpressure', backpressure, + 'drop-oldest is not supported for classic stream.Writable'); + } + + // Fall back to sensible defaults for duck-typed streams that may not + // expose the full stream.Writable property set. + const hwm = writable.writableHighWaterMark ?? 16384; + let totalBytes = 0; + + // Waiters pending on backpressure resolution (block policy only). + // Multiple un-awaited writes can each add a waiter, so this must be + // a list. A single persistent 'drain' listener and 'error' listener + // (installed once lazily) resolve or reject all waiters to avoid + // accumulating per-write listeners on the stream. + let waiters = []; + let listenersInstalled = false; + let onDrain; + let onError; + + function installListeners() { + if (listenersInstalled) return; + listenersInstalled = true; + onDrain = () => { + const pending = waiters; + waiters = []; + for (let i = 0; i < pending.length; i++) { + pending[i].resolve(); + } + }; + onError = (err) => { + const pending = waiters; + waiters = []; + for (let i = 0; i < pending.length; i++) { + pending[i].reject(err); + } + }; + writable.on('drain', onDrain); + writable.on('error', onError); + } + + // Reject all pending waiters and remove the drain/error listeners. + function cleanup(err) { + const pending = waiters; + waiters = []; + for (let i = 0; i < pending.length; i++) { + pending[i].reject(err ?? new AbortError()); + } + if (!listenersInstalled) return; + listenersInstalled = false; + writable.removeListener('drain', onDrain); + writable.removeListener('error', onError); + } + + function waitForDrain() { + const { promise, resolve, reject } = PromiseWithResolvers(); + waiters.push({ __proto__: null, resolve, reject }); + installListeners(); + return promise; + } + + function isWritable() { + // Duck-typed streams may not have these properties -- treat missing + // as false (i.e., writable is still open). + return !(writable.destroyed ?? false) && + !(writable.writableFinished ?? false) && + !(writable.writableEnded ?? false); + } + + function isFull() { + return (writable.writableLength ?? 0) >= hwm; + } + + const writer = { + __proto__: null, + + get desiredSize() { + if (!isWritable()) return null; + return MathMax(0, hwm - (writable.writableLength ?? 0)); + }, + + writeSync(chunk) { + return false; + }, + + writevSync(chunks) { + return false; + }, + + // Backpressure semantics: write() resolves when the data is accepted + // into the Writable's internal buffer, NOT when _write() has flushed + // it to the underlying resource. This matches the Writer spec -- the + // PushWriter resolves on buffer acceptance too. Classic Writable flow + // control works the same way: write rapidly until write() returns + // false, then wait for 'drain'. The _write callback is involved in + // backpressure indirectly -- 'drain' fires after callbacks drain the + // buffer below highWaterMark. Per-write errors from _write surface + // as 'error' events caught by our generic error handler, rejecting + // the next pending operation rather than the already-resolved one. + // + // The options.signal parameter from the Writer interface is ignored. + // Classic stream.Writable has no per-write abort signal support; + // cancellation should be handled at the pipeline level instead. + write(chunk) { + if (!isWritable()) { + return PromiseReject(new ERR_STREAM_WRITE_AFTER_END()); + } + + let bytes; + try { + bytes = toUint8Array(chunk); + } catch (err) { + return PromiseReject(err); + } + + if (backpressure === 'strict' && isFull()) { + return PromiseReject(new ERR_INVALID_STATE.RangeError( + 'Backpressure violation: buffer is full. ' + + 'Await each write() call to respect backpressure.')); + } + + if (backpressure === 'drop-newest' && isFull()) { + // Silently discard. Still count bytes for consistency with + // PushWriter, which counts dropped bytes in totalBytes. + totalBytes += TypedArrayPrototypeGetByteLength(bytes); + return PromiseResolve(); + } + + totalBytes += TypedArrayPrototypeGetByteLength(bytes); + const ok = writable.write(bytes); + if (ok) return PromiseResolve(); + + // backpressure === 'block' (or strict with room that filled on + // this write -- writable.write() accepted the data but returned + // false indicating the buffer is now at/over hwm). + if (backpressure === 'block') { + return waitForDrain(); + } + + // strict: the write was accepted (there was room before writing) + // but the buffer is now full. Resolve -- the *next* write will + // be rejected if the caller ignores backpressure. + return PromiseResolve(); + }, + + writev(chunks) { + if (!ArrayIsArray(chunks)) { + throw new ERR_INVALID_ARG_TYPE('chunks', 'Array', chunks); + } + if (!isWritable()) { + return PromiseReject(new ERR_STREAM_WRITE_AFTER_END()); + } + + if (backpressure === 'strict' && isFull()) { + return PromiseReject(new ERR_INVALID_STATE.RangeError( + 'Backpressure violation: buffer is full. ' + + 'Await each write() call to respect backpressure.')); + } + + if (backpressure === 'drop-newest' && isFull()) { + // Discard entire batch. + for (let i = 0; i < chunks.length; i++) { + totalBytes += + TypedArrayPrototypeGetByteLength(toUint8Array(chunks[i])); + } + return PromiseResolve(); + } + + if (typeof writable.cork === 'function') writable.cork(); + let ok = true; + for (let i = 0; i < chunks.length; i++) { + const bytes = toUint8Array(chunks[i]); + totalBytes += TypedArrayPrototypeGetByteLength(bytes); + ok = writable.write(bytes); + } + if (typeof writable.uncork === 'function') writable.uncork(); + + if (ok) return PromiseResolve(); + + if (backpressure === 'block') { + return waitForDrain(); + } + + return PromiseResolve(); + }, + + endSync() { + return -1; + }, + + // options.signal is ignored for the same reason as write(). + end() { + if ((writable.writableFinished ?? false) || + (writable.destroyed ?? false)) { + cleanup(); + return PromiseResolve(totalBytes); + } + + const { promise, resolve, reject } = PromiseWithResolvers(); + + if (!(writable.writableEnded ?? false)) { + writable.end(); + } + + eos(writable, { writable: true, readable: false }, (err) => { + cleanup(err); + if (err) reject(err); + else resolve(totalBytes); + }); + + return promise; + }, + + fail(reason) { + cleanup(reason); + if (typeof writable.destroy === 'function') { + writable.destroy(reason); + } + }, + + [SymbolAsyncDispose]() { + if (isWritable()) { + cleanup(); + if (typeof writable.destroy === 'function') { + writable.destroy(); + } + } + return PromiseResolve(); + }, + + [SymbolDispose]() { + if (isWritable()) { + cleanup(); + if (typeof writable.destroy === 'function') { + writable.destroy(); + } + } + }, + }; + + // drainableProtocol + writer[drainableProtocol] = function() { + if (!isWritable()) return null; + if ((writable.writableLength ?? 0) < hwm) { + return PromiseResolve(true); + } + const { promise, resolve } = PromiseWithResolvers(); + waiters.push({ + __proto__: null, + resolve() { resolve(true); }, + reject() { resolve(false); }, + }); + installListeners(); + return promise; + }; + + fromWritableCache.set(writable, writer); + return writer; +} + + +// ============================================================================ +// toWritable(writer) -- stream/iter Writer -> classic Writable +// ============================================================================ + +/** + * Create a classic stream.Writable backed by a stream/iter Writer. + * Each _write/_writev call delegates to the Writer's methods, + * attempting the sync path first (writeSync/writevSync/endSync) and + * falling back to async if the sync path returns false or throws. + * @param {object} writer - A stream/iter Writer (only write() is required). + * @returns {stream.Writable} + */ +function toWritable(writer) { + if (typeof writer?.write !== 'function') { + throw new ERR_INVALID_ARG_TYPE('writer', 'Writer', writer); + } + + const WritableCtor = lazyWritable(); + + const hasWriteSync = typeof writer.writeSync === 'function'; + const hasWritev = typeof writer.writev === 'function'; + const hasWritevSync = hasWritev && + typeof writer.writevSync === 'function'; + const hasEnd = typeof writer.end === 'function'; + const hasEndSync = hasEnd && + typeof writer.endSync === 'function'; + const hasFail = typeof writer.fail === 'function'; + + // Try-sync-first pattern: attempt the synchronous method and + // fall back to the async method if it returns false (indicating + // the sync path was not accepted) or throws. When the sync path + // succeeds, the callback is deferred via queueMicrotask to + // preserve the async resolution contract that Writable internals + // expect from _write/_writev/_final callbacks. + + function _write(chunk, encoding, cb) { + const bytes = typeof chunk === 'string' ? + Buffer.from(chunk, encoding) : chunk; + if (hasWriteSync) { + try { + if (writer.writeSync(bytes)) { + queueMicrotask(cb); + return; + } + } catch { + // Sync path threw -- fall through to async. + } + } + try { + PromisePrototypeThen(writer.write(bytes), () => cb(), cb); + } catch (err) { + cb(err); + } + } + + function _writev(entries, cb) { + const chunks = []; + for (let i = 0; i < entries.length; i++) { + const { chunk, encoding } = entries[i]; + chunks[i] = typeof chunk === 'string' ? + Buffer.from(chunk, encoding) : chunk; + } + if (hasWritevSync) { + try { + if (writer.writevSync(chunks)) { + queueMicrotask(cb); + return; + } + } catch { + // Sync path threw -- fall through to async. + } + } + try { + PromisePrototypeThen(writer.writev(chunks), () => cb(), cb); + } catch (err) { + cb(err); + } + } + + function _final(cb) { + if (!hasEnd) { + queueMicrotask(cb); + return; + } + if (hasEndSync) { + try { + const result = writer.endSync(); + if (result >= 0) { + queueMicrotask(cb); + return; + } + } catch { + // Sync path threw -- fall through to async. + } + } + try { + PromisePrototypeThen(writer.end(), () => cb(), cb); + } catch (err) { + cb(err); + } + } + + function _destroy(err, cb) { + if (err && hasFail) { + writer.fail(err); + } + cb(); + } + + const writableOptions = { + __proto__: null, + // Use MAX_SAFE_INTEGER to effectively disable the Writable's + // internal buffering. The underlying stream/iter Writer has its + // own backpressure handling; we want _write to be called + // immediately so the Writer can manage flow control directly. + highWaterMark: NumberMAX_SAFE_INTEGER, + write: _write, + final: _final, + destroy: _destroy, + }; + + if (hasWritev) { + writableOptions.writev = _writev; + } + + return new WritableCtor(writableOptions); +} + + +module.exports = { + // Shared helpers used by Readable.prototype[toAsyncStreamable] in + // readable.js to avoid duplicating the batched iterator logic. + createBatchedAsyncIterator, + normalizeBatch, + + // Public utilities exported from 'stream/iter'. + fromReadable, + fromWritable, + toReadable, + toReadableSync, + toWritable, +}; diff --git a/lib/internal/streams/iter/from.js b/lib/internal/streams/iter/from.js index d76f430ab0d51e..0533f0e3810398 100644 --- a/lib/internal/streams/iter/from.js +++ b/lib/internal/streams/iter/from.js @@ -37,6 +37,7 @@ const { } = require('internal/util/types'); const { + kValidatedSource, toStreamable, toAsyncStreamable, } = require('internal/streams/iter/types'); @@ -483,6 +484,11 @@ function from(input) { throw new ERR_INVALID_ARG_TYPE('input', 'a non-null value', input); } + // Fast path: validated source already yields valid Uint8Array[] batches + if (input[kValidatedSource]) { + return input; + } + // Check for primitives first (ByteInput) if (isPrimitiveChunk(input)) { const chunk = primitiveToUint8Array(input); @@ -531,11 +537,22 @@ function from(input) { // Check toAsyncStreamable protocol (takes precedence over toStreamable and // iteration protocols) if (typeof input[toAsyncStreamable] === 'function') { + const result = input[toAsyncStreamable](); + // Synchronous validated source (e.g. Readable batched iterator) + if (result?.[kValidatedSource]) { + return result; + } return { __proto__: null, async *[SymbolAsyncIterator]() { - const result = await input[toAsyncStreamable](); - yield* from(result)[SymbolAsyncIterator](); + // The result may be a Promise. Check validated on both the Promise + // itself (if tagged) and the resolved value. + const resolved = await result; + if (resolved?.[kValidatedSource]) { + yield* resolved[SymbolAsyncIterator](); + return; + } + yield* from(resolved)[SymbolAsyncIterator](); }, }; } diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index 8c49941ddb54ed..df5ca2826a9ac5 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -51,7 +51,7 @@ const { } = require('internal/streams/iter/utils'); const { - kTrustedTransform, + kValidatedTransform, } = require('internal/streams/iter/types'); // ============================================================================= @@ -564,12 +564,12 @@ async function* applyStatefulAsyncTransform(source, transform, options) { } /** - * Fast path for trusted stateful transforms (e.g. compression). + * Fast path for validated stateful transforms (e.g. compression). * Skips withFlushAsync (transform handles done internally) and * skips isUint8ArrayBatch validation (transform guarantees valid output). * @yields {Uint8Array[]} */ -async function* applyTrustedStatefulAsyncTransform(source, transform, options) { +async function* applyValidatedStatefulAsyncTransform(source, transform, options) { const output = transform(source, options); for await (const batch of output) { if (batch.length > 0) { @@ -639,8 +639,8 @@ async function* createAsyncPipeline(source, transforms, signal) { statelessRun = []; } const opts = { __proto__: null, signal: transformSignal }; - if (transform[kTrustedTransform]) { - current = applyTrustedStatefulAsyncTransform( + if (transform[kValidatedTransform]) { + current = applyValidatedStatefulAsyncTransform( current, transform.transform, opts); } else { current = applyStatefulAsyncTransform( diff --git a/lib/internal/streams/iter/transform.js b/lib/internal/streams/iter/transform.js index 4cb417ed98ce32..9782f5f50ebf2c 100644 --- a/lib/internal/streams/iter/transform.js +++ b/lib/internal/streams/iter/transform.js @@ -37,7 +37,7 @@ const { } = require('internal/errors'); const { lazyDOMException } = require('internal/util'); const { isArrayBufferView, isAnyArrayBuffer } = require('internal/util/types'); -const { kTrustedTransform } = require('internal/streams/iter/types'); +const { kValidatedTransform } = require('internal/streams/iter/types'); const { checkRangesOrGetDefault, validateFiniteNumber, @@ -306,7 +306,7 @@ function createZstdHandle(mode, options, processCallback, onError) { function makeZlibTransform(createHandleFn, processFlag, finishFlag) { return { __proto__: null, - [kTrustedTransform]: true, + [kValidatedTransform]: true, transform: async function*(source, options) { const { signal } = options; diff --git a/lib/internal/streams/iter/types.js b/lib/internal/streams/iter/types.js index c205db00e3782a..99ddc8fd582770 100644 --- a/lib/internal/streams/iter/types.js +++ b/lib/internal/streams/iter/types.js @@ -45,20 +45,30 @@ const shareSyncProtocol = SymbolFor('Stream.shareSyncProtocol'); const drainableProtocol = SymbolFor('Stream.drainableProtocol'); /** - * Internal sentinel for trusted stateful transforms. A transform object - * with [kTrustedTransform] = true signals that: + * Internal sentinel for validated stateful transforms. A transform object + * with [kValidatedTransform] = true signals that: * 1. It handles source exhaustion (done) internally - no withFlushAsync * wrapper needed. * 2. It always yields valid Uint8Array[] batches - no isUint8ArrayBatch * validation needed on each yield. * This is NOT a public protocol symbol - it uses Symbol() not Symbol.for(). */ -const kTrustedTransform = Symbol('kTrustedTransform'); +const kValidatedTransform = Symbol('kValidatedTransform'); + +/** + * Internal sentinel for validated sources. An async iterable with + * [kValidatedSource] = true signals that it already yields valid + * Uint8Array[] batches - no normalizeAsyncSource wrapper needed. + * from() will return such sources directly, skipping all normalization. + * This is NOT a public protocol symbol - it uses Symbol() not Symbol.for(). + */ +const kValidatedSource = Symbol('kValidatedSource'); module.exports = { broadcastProtocol, drainableProtocol, - kTrustedTransform, + kValidatedSource, + kValidatedTransform, shareProtocol, shareSyncProtocol, toAsyncStreamable, diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 919c527a2be6f8..8ee754edcad14b 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -35,6 +35,7 @@ const { Symbol, SymbolAsyncDispose, SymbolAsyncIterator, + SymbolFor, SymbolSpecies, TypedArrayPrototypeSet, } = primordials; @@ -52,6 +53,8 @@ const { } = require('internal/streams/add-abort-signal'); const { eos } = require('internal/streams/end-of-stream'); +const { getOptionValue } = require('internal/options'); + let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { debug = fn; }); @@ -82,6 +85,7 @@ const { ERR_INVALID_ARG_TYPE, ERR_METHOD_NOT_IMPLEMENTED, ERR_OUT_OF_RANGE, + ERR_STREAM_ITER_MISSING_FLAG, ERR_STREAM_PUSH_AFTER_EOF, ERR_STREAM_UNSHIFT_AFTER_END_EVENT, ERR_UNKNOWN_ENCODING, @@ -1796,3 +1800,42 @@ Readable.wrap = function(src, options) { }, }).wrap(src); }; + +// Interop with the stream/iter API via the toAsyncStreamable protocol. +// +// The batched iterator logic lives in classic.js (shared with the +// fromReadable() utility for duck-typed streams). This prototype method +// calls createBatchedAsyncIterator directly -- it must NOT call +// fromReadable() since fromReadable() checks for toAsyncStreamable, +// which would create infinite recursion. +// +// The flag cannot be checked at module load time (readable.js loads during +// bootstrap before options are available). Instead, toAsyncStreamable is +// always defined but lazily initializes on first call -- throwing if the +// flag is not set. +{ + const toAsyncStreamable = SymbolFor('Stream.toAsyncStreamable'); + let createBatchedAsyncIterator; + let normalizeBatch; + let kValidatedSource; + + Readable.prototype[toAsyncStreamable] = function() { + if (createBatchedAsyncIterator === undefined) { + if (!getOptionValue('--experimental-stream-iter')) { + throw new ERR_STREAM_ITER_MISSING_FLAG(); + } + ({ + createBatchedAsyncIterator, + normalizeBatch, + } = require('internal/streams/iter/classic')); + ({ kValidatedSource } = require('internal/streams/iter/types')); + } + const state = this._readableState; + const normalize = (state.objectMode || state.encoding) ? + normalizeBatch : null; + const iter = createBatchedAsyncIterator(this, normalize); + iter[kValidatedSource] = true; + iter.stream = this; + return iter; + }; +} diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index e934c38bbf9acc..89d384e9f2fb8f 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -47,7 +47,6 @@ const Stream = require('internal/streams/legacy').Stream; const { Buffer } = require('buffer'); const destroyImpl = require('internal/streams/destroy'); const { eos } = require('internal/streams/end-of-stream'); - const { addAbortSignal, } = require('internal/streams/add-abort-signal'); diff --git a/lib/stream/iter.js b/lib/stream/iter.js index e77e485a7f2bd5..4cb499bb161694 100644 --- a/lib/stream/iter.js +++ b/lib/stream/iter.js @@ -50,6 +50,15 @@ const { ondrain, } = require('internal/streams/iter/consumers'); +// Classic stream interop (Node.js-specific, not part of the spec) +const { + fromReadable, + fromWritable, + toReadable, + toReadableSync, + toWritable, +} = require('internal/streams/iter/classic'); + // Multi-consumer const { broadcast, Broadcast } = require('internal/streams/iter/broadcast'); const { @@ -177,4 +186,11 @@ module.exports = { tap, tapSync, ondrain, + + // Classic stream interop + fromReadable, + fromWritable, + toReadable, + toReadableSync, + toWritable, }; diff --git a/test/parallel/test-stream-iter-readable-interop-disabled.js b/test/parallel/test-stream-iter-readable-interop-disabled.js new file mode 100644 index 00000000000000..86c8cc141387cd --- /dev/null +++ b/test/parallel/test-stream-iter-readable-interop-disabled.js @@ -0,0 +1,57 @@ +'use strict'; + +// Tests that toAsyncStreamable throws ERR_STREAM_ITER_MISSING_FLAG +// when --experimental-stream-iter is not enabled. + +const common = require('../common'); +const assert = require('assert'); +const { spawnPromisified } = common; + +async function testToAsyncStreamableWithoutFlag() { + const { stderr, code } = await spawnPromisified(process.execPath, [ + '-e', + ` + const { Readable } = require('stream'); + const r = new Readable({ read() {} }); + r[Symbol.for('Stream.toAsyncStreamable')](); + `, + ]); + assert.notStrictEqual(code, 0); + assert.match(stderr, /ERR_STREAM_ITER_MISSING_FLAG/); +} + +async function testToAsyncStreamableWithFlag() { + const { code } = await spawnPromisified(process.execPath, [ + '--experimental-stream-iter', + '-e', + ` + const { Readable } = require('stream'); + const r = new Readable({ + read() { this.push(Buffer.from('ok')); this.push(null); } + }); + const sym = Symbol.for('Stream.toAsyncStreamable'); + const iter = r[sym](); + // Should not throw, and should have stream property + if (!iter.stream) process.exit(1); + `, + ]); + assert.strictEqual(code, 0); +} + +async function testStreamIterModuleWithoutFlag() { + // Requiring 'stream/iter' without the flag should not be possible + // since the module is gated behind --experimental-stream-iter. + const { code } = await spawnPromisified(process.execPath, [ + '-e', + ` + require('stream/iter'); + `, + ]); + assert.notStrictEqual(code, 0); +} + +Promise.all([ + testToAsyncStreamableWithoutFlag(), + testToAsyncStreamableWithFlag(), + testStreamIterModuleWithoutFlag(), +]).then(common.mustCall()); diff --git a/test/parallel/test-stream-iter-readable-interop.js b/test/parallel/test-stream-iter-readable-interop.js new file mode 100644 index 00000000000000..8100b54168be87 --- /dev/null +++ b/test/parallel/test-stream-iter-readable-interop.js @@ -0,0 +1,640 @@ +// Flags: --experimental-stream-iter +'use strict'; + +// Tests for classic Readable stream interop with the stream/iter API +// via the toAsyncStreamable protocol and kValidatedSource optimization. + +const common = require('../common'); +const assert = require('assert'); +const { Readable } = require('stream'); +const { + from, + pull, + bytes, + text, +} = require('stream/iter'); + +const toAsyncStreamable = Symbol.for('Stream.toAsyncStreamable'); + +// ============================================================================= +// toAsyncStreamable protocol is present on Readable.prototype +// ============================================================================= + +function testProtocolExists() { + assert.strictEqual(typeof Readable.prototype[toAsyncStreamable], 'function'); + + const readable = new Readable({ read() {} }); + assert.strictEqual(typeof readable[toAsyncStreamable], 'function'); +} + +// ============================================================================= +// Byte-mode Readable: basic round-trip through from() +// ============================================================================= + +async function testByteModeThroughFrom() { + const readable = new Readable({ + read() { + this.push(Buffer.from('hello')); + this.push(Buffer.from(' world')); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'hello world'); +} + +// ============================================================================= +// Byte-mode Readable: basic round-trip through pull() +// ============================================================================= + +async function testByteModeThroughPull() { + const readable = new Readable({ + read() { + this.push(Buffer.from('pull ')); + this.push(Buffer.from('test')); + this.push(null); + }, + }); + + const result = await text(pull(readable)); + assert.strictEqual(result, 'pull test'); +} + +// ============================================================================= +// Byte-mode Readable: bytes consumer +// ============================================================================= + +async function testByteModeBytes() { + const data = Buffer.from('binary data here'); + const readable = new Readable({ + read() { + this.push(data); + this.push(null); + }, + }); + + const result = await bytes(from(readable)); + assert.deepStrictEqual(result, new Uint8Array(data)); +} + +// ============================================================================= +// Byte-mode Readable: batching - multiple buffered chunks yield as one batch +// ============================================================================= + +async function testBatchingBehavior() { + const readable = new Readable({ + read() { + // Push multiple chunks synchronously so they all buffer + for (let i = 0; i < 10; i++) { + this.push(Buffer.from(`chunk${i}`)); + } + this.push(null); + }, + }); + + const source = from(readable); + const batches = []; + for await (const batch of source) { + batches.push(batch); + } + + // All chunks were buffered synchronously, so they should come out + // as fewer batches than individual chunks (ideally one batch). + assert.ok(batches.length < 10, + `Expected fewer batches than chunks, got ${batches.length}`); + + // Total data should be correct + const allChunks = batches.flat(); + const combined = Buffer.concat(allChunks); + let expected = ''; + for (let i = 0; i < 10; i++) { + expected += `chunk${i}`; + } + assert.strictEqual(combined.toString(), expected); +} + +// ============================================================================= +// Byte-mode Readable: kValidatedSource is set +// ============================================================================= + +function testTrustedSourceByteMode() { + const readable = new Readable({ read() {} }); + const result = readable[toAsyncStreamable](); + // kValidatedSource is a private symbol, but we can verify the result + // is used directly by from() without wrapping by checking it has + // Symbol.asyncIterator + assert.strictEqual(typeof result[Symbol.asyncIterator], 'function'); + assert.strictEqual(result.stream, readable); +} + +// ============================================================================= +// Byte-mode Readable: multi-read with delayed pushes +// ============================================================================= + +async function testDelayedPushes() { + let pushCount = 0; + const readable = new Readable({ + read() { + if (pushCount < 3) { + setTimeout(() => { + this.push(Buffer.from(`delayed${pushCount}`)); + pushCount++; + if (pushCount === 3) { + this.push(null); + } + }, 10); + } + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'delayed0delayed1delayed2'); +} + +// ============================================================================= +// Byte-mode Readable: empty stream +// ============================================================================= + +async function testEmptyStream() { + const readable = new Readable({ + read() { + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, ''); +} + +// ============================================================================= +// Byte-mode Readable: error propagation +// ============================================================================= + +async function testErrorPropagation() { + const readable = new Readable({ + read() { + process.nextTick(() => this.destroy(new Error('test error'))); + }, + }); + + await assert.rejects( + text(from(readable)), + (err) => err.message === 'test error', + ); +} + +// ============================================================================= +// Byte-mode Readable: with transforms +// ============================================================================= + +async function testWithTransform() { + const readable = new Readable({ + read() { + this.push(Buffer.from('hello')); + this.push(null); + }, + }); + + // Uppercase transform + function uppercase(chunks) { + if (chunks === null) return null; + return chunks.map((c) => { + const buf = Buffer.from(c); + for (let i = 0; i < buf.length; i++) { + if (buf[i] >= 97 && buf[i] <= 122) buf[i] -= 32; + } + return buf; + }); + } + + const result = await text(pull(readable, uppercase)); + assert.strictEqual(result, 'HELLO'); +} + +// ============================================================================= +// Object-mode Readable: strings are normalized to Uint8Array +// ============================================================================= + +async function testObjectModeStrings() { + const readable = new Readable({ + objectMode: true, + read() { + this.push('hello'); + this.push(' object'); + this.push(' mode'); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'hello object mode'); +} + +// ============================================================================= +// Object-mode Readable: Uint8Array chunks pass through +// ============================================================================= + +async function testObjectModeUint8Array() { + const readable = new Readable({ + objectMode: true, + read() { + this.push(new Uint8Array([72, 73])); // "HI" + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'HI'); +} + +// ============================================================================= +// Object-mode Readable: mixed types (strings + Uint8Array) +// ============================================================================= + +async function testObjectModeMixed() { + const readable = new Readable({ + objectMode: true, + read() { + this.push('hello'); + this.push(Buffer.from(' ')); + this.push('world'); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'hello world'); +} + +// ============================================================================= +// Object-mode Readable: toStreamable protocol objects +// ============================================================================= + +async function testObjectModeToStreamable() { + const toStreamableSym = Symbol.for('Stream.toStreamable'); + const readable = new Readable({ + objectMode: true, + read() { + this.push({ + [toStreamableSym]() { + return 'from-protocol'; + }, + }); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'from-protocol'); +} + +// ============================================================================= +// Object-mode Readable: kValidatedSource is set +// ============================================================================= + +function testTrustedSourceObjectMode() { + const readable = new Readable({ objectMode: true, read() {} }); + const result = readable[toAsyncStreamable](); + assert.strictEqual(typeof result[Symbol.asyncIterator], 'function'); + assert.strictEqual(result.stream, readable); +} + +// ============================================================================= +// Encoded Readable: strings are re-encoded to Uint8Array +// ============================================================================= + +async function testEncodedReadable() { + const readable = new Readable({ + encoding: 'utf8', + read() { + this.push(Buffer.from('encoded')); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'encoded'); +} + +// ============================================================================= +// Readable.from() source: verify interop with Readable.from() +// ============================================================================= + +async function testReadableFrom() { + const readable = Readable.from(['chunk1', 'chunk2', 'chunk3']); + + const result = await text(from(readable)); + assert.strictEqual(result, 'chunk1chunk2chunk3'); +} + +// ============================================================================= +// Byte-mode Readable: large data +// ============================================================================= + +async function testLargeData() { + const totalSize = 1024 * 1024; // 1 MB + const chunkSize = 16384; + let pushed = 0; + + const readable = new Readable({ + read() { + if (pushed < totalSize) { + const size = Math.min(chunkSize, totalSize - pushed); + const buf = Buffer.alloc(size, 0x41); // Fill with 'A' + this.push(buf); + pushed += size; + } else { + this.push(null); + } + }, + }); + + const result = await bytes(from(readable)); + assert.strictEqual(result.length, totalSize); + assert.strictEqual(result[0], 0x41); + assert.strictEqual(result[totalSize - 1], 0x41); +} + +// ============================================================================= +// Byte-mode Readable: consumer return (early termination) +// ============================================================================= + +async function testEarlyTermination() { + let pushCount = 0; + const readable = new Readable({ + read() { + this.push(Buffer.from(`chunk${pushCount++}`)); + // Never pushes null - infinite stream + }, + }); + + // Take only the first batch + const source = from(readable); + const batches = []; + for await (const batch of source) { + batches.push(batch); + break; // Stop after first batch + } + + assert.ok(batches.length >= 1); + // Stream should be destroyed after consumer return + // Give it a tick to clean up + await new Promise((resolve) => setTimeout(resolve, 50)); + assert.ok(readable.destroyed); +} + +// ============================================================================= +// Byte-mode Readable: pull() with compression transform +// ============================================================================= + +async function testWithCompression() { + const { + compressGzip, + decompressGzip, + } = require('zlib/iter'); + + const readable = new Readable({ + read() { + this.push(Buffer.from('compress me via classic Readable')); + this.push(null); + }, + }); + + const compressed = pull(readable, compressGzip()); + const result = await text(pull(compressed, decompressGzip())); + assert.strictEqual(result, 'compress me via classic Readable'); +} + +// ============================================================================= +// Object-mode Readable: error propagation +// ============================================================================= + +async function testObjectModeError() { + const readable = new Readable({ + objectMode: true, + read() { + process.nextTick(() => this.destroy(new Error('object error'))); + }, + }); + + await assert.rejects( + text(from(readable)), + (err) => err.message === 'object error', + ); +} + +// ============================================================================= +// Stream destroyed mid-iteration +// ============================================================================= + +async function testDestroyMidIteration() { + let pushCount = 0; + const readable = new Readable({ + read() { + this.push(Buffer.from(`chunk${pushCount++}`)); + // Never pushes null - infinite stream + }, + }); + + const chunks = []; + await assert.rejects(async () => { + for await (const batch of from(readable)) { + chunks.push(...batch); + if (chunks.length >= 3) { + readable.destroy(); + } + } + }, { code: 'ERR_STREAM_PREMATURE_CLOSE' }); + assert.ok(readable.destroyed); + assert.ok(chunks.length >= 3); +} + +// ============================================================================= +// Error after partial data +// ============================================================================= + +async function testErrorAfterPartialData() { + let count = 0; + const readable = new Readable({ + read() { + if (count < 3) { + this.push(Buffer.from(`ok${count++}`)); + } else { + this.destroy(new Error('late error')); + } + }, + }); + + const chunks = []; + await assert.rejects(async () => { + for await (const batch of from(readable)) { + chunks.push(...batch); + } + }, { message: 'late error' }); + assert.ok(chunks.length > 0, 'Should have received partial data'); +} + +// ============================================================================= +// Multiple consumers (second iteration yields empty) +// ============================================================================= + +async function testMultipleConsumers() { + const readable = new Readable({ + read() { + this.push(Buffer.from('data')); + this.push(null); + }, + }); + + const first = await text(from(readable)); + assert.strictEqual(first, 'data'); + + // Second consumption - stream is already consumed/destroyed + const second = await text(from(readable)); + assert.strictEqual(second, ''); +} + +// ============================================================================= +// highWaterMark: 0 - each chunk becomes its own batch +// ============================================================================= + +async function testHighWaterMarkZero() { + let pushCount = 0; + const readable = new Readable({ + highWaterMark: 0, + read() { + if (pushCount < 3) { + this.push(Buffer.from(`hwm0-${pushCount++}`)); + } else { + this.push(null); + } + }, + }); + + const batches = []; + for await (const batch of from(readable)) { + batches.push(batch); + } + + // With HWM=0, buffer is always empty so drain loop never fires. + // Each chunk should be its own batch. + assert.strictEqual(batches.length, 3); + const combined = Buffer.concat(batches.flat()); + assert.strictEqual(combined.toString(), 'hwm0-0hwm0-1hwm0-2'); +} + +// ============================================================================= +// Duplex stream (Duplex extends Readable, toAsyncStreamable should work) +// ============================================================================= + +async function testDuplexStream() { + const { Duplex } = require('stream'); + + const duplex = new Duplex({ + read() { + this.push(Buffer.from('duplex-data')); + this.push(null); + }, + write(chunk, enc, cb) { cb(); }, + }); + + const result = await text(from(duplex)); + assert.strictEqual(result, 'duplex-data'); +} + +// ============================================================================= +// setEncoding called dynamically after construction +// ============================================================================= + +async function testSetEncodingDynamic() { + const readable = new Readable({ + read() { + this.push(Buffer.from('dynamic-enc')); + this.push(null); + }, + }); + + readable.setEncoding('utf8'); + + const result = await text(from(readable)); + assert.strictEqual(result, 'dynamic-enc'); +} + +// ============================================================================= +// AbortSignal cancellation +// ============================================================================= + +async function testAbortSignal() { + let pushCount = 0; + const readable = new Readable({ + read() { + this.push(Buffer.from(`sig${pushCount++}`)); + // Never pushes null - infinite stream + }, + }); + + const ac = new AbortController(); + const chunks = []; + + await assert.rejects(async () => { + for await (const batch of pull(readable, { signal: ac.signal })) { + chunks.push(...batch); + if (chunks.length >= 2) { + ac.abort(); + } + } + }, { name: 'AbortError' }); + assert.ok(chunks.length >= 2); +} + +// ============================================================================= +// kValidatedSource identity - from() returns same object for validated sources +// ============================================================================= + +function testTrustedSourceIdentity() { + const readable = new Readable({ read() {} }); + const iter = readable[toAsyncStreamable](); + + // from() should return the validated iterator directly (same reference), + // not wrap it in another generator + const result = from(iter); + assert.strictEqual(result, iter); +} + +// ============================================================================= +// Run all tests +// ============================================================================= + +testProtocolExists(); +testTrustedSourceByteMode(); +testTrustedSourceObjectMode(); +testTrustedSourceIdentity(); + +Promise.all([ + testByteModeThroughFrom(), + testByteModeThroughPull(), + testByteModeBytes(), + testBatchingBehavior(), + testDelayedPushes(), + testEmptyStream(), + testErrorPropagation(), + testWithTransform(), + testObjectModeStrings(), + testObjectModeUint8Array(), + testObjectModeMixed(), + testObjectModeToStreamable(), + testEncodedReadable(), + testReadableFrom(), + testLargeData(), + testEarlyTermination(), + testWithCompression(), + testObjectModeError(), + testDestroyMidIteration(), + testErrorAfterPartialData(), + testMultipleConsumers(), + testHighWaterMarkZero(), + testDuplexStream(), + testSetEncodingDynamic(), + testAbortSignal(), +]).then(common.mustCall()); diff --git a/test/parallel/test-stream-iter-to-readable.js b/test/parallel/test-stream-iter-to-readable.js new file mode 100644 index 00000000000000..4cb5600e3ba424 --- /dev/null +++ b/test/parallel/test-stream-iter-to-readable.js @@ -0,0 +1,620 @@ +// Flags: --experimental-stream-iter +'use strict'; + +// Tests for toReadable() and toReadableSync() which create byte-mode +// Readable streams from stream/iter sources. + +const common = require('../common'); +const assert = require('assert'); +const { Writable } = require('stream'); +const { + from, + fromSync, + pull, + text, + toReadable, + toReadableSync, +} = require('stream/iter'); + +function collect(readable) { + return new Promise((resolve, reject) => { + const chunks = []; + readable.on('data', (chunk) => chunks.push(chunk)); + readable.on('end', () => resolve(Buffer.concat(chunks))); + readable.on('error', reject); + }); +} + +// ============================================================================= +// fromStreamIter: basic async round-trip +// ============================================================================= + +async function testBasicAsync() { + const source = from('hello world'); + const readable = toReadable(source); + + assert.strictEqual(readable.readableObjectMode, false); + + const result = await collect(readable); + assert.strictEqual(result.toString(), 'hello world'); +} + +// ============================================================================= +// fromStreamIter: multiple batches, all chunks arrive in order +// ============================================================================= + +async function testMultiBatchAsync() { + async function* gen() { + for (let i = 0; i < 5; i++) { + const batch = []; + for (let j = 0; j < 3; j++) { + batch.push(Buffer.from(`${i}-${j} `)); + } + yield batch; + } + } + + const readable = toReadable(gen()); + const result = await collect(readable); + assert.strictEqual(result.toString(), + '0-0 0-1 0-2 1-0 1-1 1-2 2-0 2-1 2-2 3-0 3-1 3-2 4-0 4-1 4-2 '); +} + +// ============================================================================= +// fromStreamIter: backpressure with small highWaterMark +// ============================================================================= + +async function testBackpressureAsync() { + let batchCount = 0; + async function* gen() { + for (let i = 0; i < 10; i++) { + batchCount++; + yield [Buffer.from(`chunk${i}`)]; + } + } + + const readable = toReadable(gen(), { highWaterMark: 1 }); + + // Read one chunk at a time with delays to exercise backpressure + const chunks = []; + for await (const chunk of readable) { + chunks.push(chunk); + } + + assert.strictEqual(chunks.length, 10); + assert.strictEqual(batchCount, 10); +} + +// ============================================================================= +// fromStreamIter: source error mid-stream +// ============================================================================= + +async function testErrorAsync() { + async function* gen() { + yield [Buffer.from('ok')]; + throw new Error('source failed'); + } + + const readable = toReadable(gen()); + + await assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const chunk of readable) { + // Consume until error + } + }, { message: 'source failed' }); +} + +// ============================================================================= +// fromStreamIter: empty source +// ============================================================================= + +async function testEmptyAsync() { + async function* gen() { + // yields nothing + } + + const readable = toReadable(gen()); + const result = await collect(readable); + assert.strictEqual(result.length, 0); +} + +// ============================================================================= +// fromStreamIter: empty batches are skipped +// ============================================================================= + +async function testEmptyBatchAsync() { + async function* gen() { + yield []; + yield [Buffer.from('real data')]; + yield []; + } + + const readable = toReadable(gen()); + const result = await collect(readable); + assert.strictEqual(result.toString(), 'real data'); +} + +// ============================================================================= +// fromStreamIter: destroy mid-stream cleans up iterator +// ============================================================================= + +async function testDestroyAsync() { + let returnCalled = false; + async function* gen() { + try { + for (let i = 0; ; i++) { + yield [Buffer.from(`chunk${i}`)]; + } + } finally { + returnCalled = true; + } + } + + const readable = toReadable(gen()); + + // Read a couple chunks then destroy + const chunks = []; + await new Promise((resolve, reject) => { + readable.on('data', (chunk) => { + chunks.push(chunk); + if (chunks.length >= 3) { + readable.destroy(); + } + }); + readable.on('close', resolve); + readable.on('error', reject); + }); + + assert.ok(chunks.length >= 3); + assert.ok(returnCalled, 'iterator.return() should have been called'); +} + +// ============================================================================= +// fromStreamIter: destroy while pump is waiting on backpressure +// ============================================================================= + +async function testDestroyDuringBackpressure() { + let returnCalled = false; + async function* gen() { + try { + // Yield a large batch that will trigger backpressure with HWM=1 + yield [Buffer.from('a'), Buffer.from('b'), Buffer.from('c')]; + // This should never be reached + yield [Buffer.from('d')]; + } finally { + returnCalled = true; + } + } + + const readable = toReadable(gen(), { highWaterMark: 1 }); + + // Read one chunk to start the pump, then destroy while it's waiting + const chunk = await new Promise((resolve) => { + readable.once('readable', () => resolve(readable.read())); + }); + assert.ok(chunk); + + // The pump should be waiting on backpressure now. Destroy the stream. + readable.destroy(); + + await new Promise((resolve) => readable.on('close', resolve)); + assert.ok(readable.destroyed); + assert.ok(returnCalled, 'iterator.return() should have been called'); +} + +// ============================================================================= +// fromStreamIter: large data integrity +// ============================================================================= + +async function testLargeDataAsync() { + const totalSize = 1024 * 1024; // 1 MB + const chunkSize = 16384; + + async function* gen() { + let remaining = totalSize; + while (remaining > 0) { + const size = Math.min(chunkSize, remaining); + yield [Buffer.alloc(size, 0x42)]; // Fill with 'B' + remaining -= size; + } + } + + const readable = toReadable(gen()); + const result = await collect(readable); + assert.strictEqual(result.length, totalSize); + assert.strictEqual(result[0], 0x42); + assert.strictEqual(result[totalSize - 1], 0x42); +} + +// ============================================================================= +// fromStreamIter: pipe to Writable +// ============================================================================= + +async function testPipeAsync() { + const source = from('pipe test data'); + const readable = toReadable(source); + + const chunks = []; + const writable = new Writable({ + write(chunk, enc, cb) { + chunks.push(chunk); + cb(); + }, + }); + + await new Promise((resolve, reject) => { + readable.pipe(writable); + writable.on('finish', resolve); + writable.on('error', reject); + }); + + assert.strictEqual(Buffer.concat(chunks).toString(), 'pipe test data'); +} + +// ============================================================================= +// fromStreamIter: not object mode +// ============================================================================= + +function testNotObjectMode() { + async function* gen() { yield [Buffer.from('x')]; } + const readable = toReadable(gen()); + assert.strictEqual(readable.readableObjectMode, false); +} + +// ============================================================================= +// fromStreamIter: invalid source throws +// ============================================================================= + +function testInvalidSourceAsync() { + assert.throws(() => toReadable(42), + { code: 'ERR_INVALID_ARG_TYPE' }); + assert.throws(() => toReadable('not iterable'), + { code: 'ERR_INVALID_ARG_TYPE' }); + assert.throws(() => toReadable(null), + { code: 'ERR_INVALID_ARG_TYPE' }); +} + +// ============================================================================= +// fromStreamIter: signal aborts the readable +// ============================================================================= + +async function testSignalAsync() { + async function* gen() { + for (let i = 0; ; i++) { + yield [Buffer.from(`chunk${i}`)]; + } + } + + const ac = new AbortController(); + const readable = toReadable(gen(), { signal: ac.signal }); + + const chunks = []; + await assert.rejects(async () => { + for await (const chunk of readable) { + chunks.push(chunk); + if (chunks.length >= 3) { + ac.abort(); + } + } + }, { name: 'AbortError' }); + assert.ok(chunks.length >= 3); + assert.ok(readable.destroyed); +} + +// ============================================================================= +// fromStreamIter: signal already aborted +// ============================================================================= + +async function testSignalAlreadyAborted() { + async function* gen() { + yield [Buffer.from('should not reach')]; + } + + const ac = new AbortController(); + ac.abort(); + const readable = toReadable(gen(), { signal: ac.signal }); + + await assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const chunk of readable) { + // Should not receive any data + } + }, { name: 'AbortError' }); + assert.ok(readable.destroyed); +} + +// ============================================================================= +// fromStreamIter: options validation +// ============================================================================= + +function testOptionsValidationAsync() { + async function* gen() { yield [Buffer.from('x')]; } + + // Options must be an object + assert.throws(() => toReadable(gen(), 'bad'), + { code: 'ERR_INVALID_ARG_TYPE' }); + assert.throws(() => toReadable(gen(), 42), + { code: 'ERR_INVALID_ARG_TYPE' }); + + // highWaterMark must be a non-negative integer + assert.throws(() => toReadable(gen(), { highWaterMark: -1 }), + { code: 'ERR_OUT_OF_RANGE' }); + assert.throws(() => toReadable(gen(), { highWaterMark: 'big' }), + { code: 'ERR_INVALID_ARG_TYPE' }); + assert.throws(() => toReadable(gen(), { highWaterMark: 1.5 }), + { code: 'ERR_OUT_OF_RANGE' }); + + // Signal must be an AbortSignal + assert.throws(() => toReadable(gen(), { signal: 'not a signal' }), + { code: 'ERR_INVALID_ARG_TYPE' }); + assert.throws(() => toReadable(gen(), { signal: 42 }), + { code: 'ERR_INVALID_ARG_TYPE' }); + + // Valid options should work + const r = toReadable(gen(), { highWaterMark: 0 }); + assert.strictEqual(r.readableHighWaterMark, 0); + r.destroy(); + + // Default highWaterMark should be 16KB + const r2 = toReadable(gen()); + assert.strictEqual(r2.readableHighWaterMark, 64 * 1024); + r2.destroy(); + + // Explicit undefined options should work (uses defaults) + const r3 = toReadable(gen(), undefined); + assert.strictEqual(r3.readableHighWaterMark, 64 * 1024); + r3.destroy(); +} + +// ============================================================================= +// fromStreamIter: with stream/iter transform +// ============================================================================= + +async function testWithTransformAsync() { + // Uppercase transform + function upper(chunks) { + if (chunks === null) return null; + return chunks.map((c) => { + const buf = Buffer.from(c); + for (let i = 0; i < buf.length; i++) { + if (buf[i] >= 97 && buf[i] <= 122) buf[i] -= 32; + } + return buf; + }); + } + + const source = pull(from('hello world'), upper); + const readable = toReadable(source); + const result = await collect(readable); + assert.strictEqual(result.toString(), 'HELLO WORLD'); +} + +// ============================================================================= +// fromStreamIterSync: basic sync round-trip +// ============================================================================= + +async function testBasicSync() { + const source = fromSync('sync hello'); + const readable = toReadableSync(source); + + assert.strictEqual(readable.readableObjectMode, false); + + const result = await collect(readable); + assert.strictEqual(result.toString(), 'sync hello'); +} + +// ============================================================================= +// fromStreamIterSync: synchronous read() returns data immediately +// ============================================================================= + +function testSyncRead() { + const source = fromSync('immediate'); + const readable = toReadableSync(source); + + // Synchronous read should return data right away + const chunk = readable.read(); + assert.ok(Buffer.isBuffer(chunk)); + assert.strictEqual(chunk.toString(), 'immediate'); +} + +// ============================================================================= +// fromStreamIterSync: backpressure with small highWaterMark +// ============================================================================= + +async function testBackpressureSync() { + function* gen() { + for (let i = 0; i < 10; i++) { + yield [Buffer.from(`s${i}`)]; + } + } + + const readable = toReadableSync(gen(), { highWaterMark: 1 }); + + const chunks = []; + for await (const chunk of readable) { + chunks.push(chunk); + } + + assert.strictEqual(chunks.length, 10); +} + +// ============================================================================= +// fromStreamIterSync: source error +// ============================================================================= + +async function testErrorSync() { + function* gen() { + yield [Buffer.from('ok')]; + throw new Error('sync source failed'); + } + + const readable = toReadableSync(gen()); + + await assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const chunk of readable) { + // Consume until error + } + }, { message: 'sync source failed' }); +} + +// ============================================================================= +// fromStreamIterSync: empty source +// ============================================================================= + +function testEmptySync() { + function* gen() { + // yields nothing + } + + const readable = toReadableSync(gen()); + const result = readable.read(); + assert.strictEqual(result, null); +} + +// ============================================================================= +// fromStreamIterSync: destroy calls iterator.return() +// ============================================================================= + +async function testDestroySync() { + let returnCalled = false; + function* gen() { + try { + for (let i = 0; ; i++) { + yield [Buffer.from(`chunk${i}`)]; + } + } finally { + returnCalled = true; + } + } + + const readable = toReadableSync(gen()); + readable.read(); // Start iteration + readable.destroy(); + + await new Promise((resolve) => readable.on('close', resolve)); + assert.ok(returnCalled, 'iterator.return() should have been called'); +} + +// ============================================================================= +// fromStreamIterSync: not object mode +// ============================================================================= + +function testNotObjectModeSync() { + function* gen() { yield [Buffer.from('x')]; } + const readable = toReadableSync(gen()); + assert.strictEqual(readable.readableObjectMode, false); +} + +// ============================================================================= +// fromStreamIterSync: invalid source throws +// ============================================================================= + +function testInvalidSourceSync() { + assert.throws(() => toReadableSync(42), + { code: 'ERR_INVALID_ARG_TYPE' }); + assert.throws(() => toReadableSync(null), + { code: 'ERR_INVALID_ARG_TYPE' }); +} + +// ============================================================================= +// fromStreamIterSync: options validation +// ============================================================================= + +function testOptionsValidationSync() { + function* gen() { yield [Buffer.from('x')]; } + + // Options must be an object + assert.throws(() => toReadableSync(gen(), 'bad'), + { code: 'ERR_INVALID_ARG_TYPE' }); + assert.throws(() => toReadableSync(gen(), 42), + { code: 'ERR_INVALID_ARG_TYPE' }); + + // highWaterMark must be a non-negative integer + assert.throws(() => toReadableSync(gen(), { highWaterMark: -1 }), + { code: 'ERR_OUT_OF_RANGE' }); + assert.throws(() => toReadableSync(gen(), { highWaterMark: 'big' }), + { code: 'ERR_INVALID_ARG_TYPE' }); + assert.throws(() => toReadableSync(gen(), { highWaterMark: 1.5 }), + { code: 'ERR_OUT_OF_RANGE' }); + + // Valid options should work + const r = toReadableSync(gen(), { highWaterMark: 0 }); + assert.strictEqual(r.readableHighWaterMark, 0); + r.destroy(); + + // Default highWaterMark should be 16KB + const r2 = toReadableSync(gen()); + assert.strictEqual(r2.readableHighWaterMark, 64 * 1024); + r2.destroy(); +} + +// ============================================================================= +// Round-trip: stream/iter -> Readable -> stream/iter +// ============================================================================= + +async function testRoundTrip() { + const original = 'round trip data test'; + + // stream/iter -> classic Readable + const source = from(original); + const readable = toReadable(source); + + // classic Readable -> stream/iter (via toAsyncStreamable) + const result = await text(from(readable)); + assert.strictEqual(result, original); +} + +// ============================================================================= +// Round-trip with compression +// ============================================================================= + +async function testRoundTripWithCompression() { + const { compressGzip, decompressGzip } = require('zlib/iter'); + + const original = 'compress through classic readable'; + + // Compress via stream/iter, bridge to classic Readable + const compressed = pull(from(original), compressGzip()); + const readable = toReadable(compressed); + + // Classic Readable back to stream/iter for decompression + const result = await text(pull(from(readable), decompressGzip())); + assert.strictEqual(result, original); +} + +// ============================================================================= +// Run all tests +// ============================================================================= + +testNotObjectMode(); +testNotObjectModeSync(); +testSyncRead(); +testEmptySync(); +testInvalidSourceAsync(); +testInvalidSourceSync(); +testOptionsValidationAsync(); +testOptionsValidationSync(); + +Promise.all([ + testBasicAsync(), + testMultiBatchAsync(), + testBackpressureAsync(), + testErrorAsync(), + testEmptyAsync(), + testEmptyBatchAsync(), + testDestroyAsync(), + testDestroyDuringBackpressure(), + testSignalAsync(), + testSignalAlreadyAborted(), + testLargeDataAsync(), + testPipeAsync(), + testWithTransformAsync(), + testBasicSync(), + testBackpressureSync(), + testErrorSync(), + testDestroySync(), + testRoundTrip(), + testRoundTripWithCompression(), +]).then(common.mustCall()); diff --git a/test/parallel/test-stream-iter-writable-from.js b/test/parallel/test-stream-iter-writable-from.js new file mode 100644 index 00000000000000..fd922c5cf99537 --- /dev/null +++ b/test/parallel/test-stream-iter-writable-from.js @@ -0,0 +1,596 @@ +// Flags: --experimental-stream-iter +'use strict'; + +// Tests for toWritable() - creating a classic stream.Writable +// backed by a stream/iter Writer. + +const common = require('../common'); +const assert = require('assert'); +const { + push, + text, + toWritable, +} = require('stream/iter'); + +// ============================================================================= +// Basic: write through fromStreamIter writable, read from readable +// ============================================================================= + +async function testBasicWrite() { + const { writer, readable } = push({ backpressure: 'block' }); + const writable = toWritable(writer); + + writable.write('hello'); + writable.write(' world'); + writable.end(); + + const result = await text(readable); + assert.strictEqual(result, 'hello world'); +} + +// ============================================================================= +// _write delegates to writer.write() +// ============================================================================= + +async function testWriteDelegatesToWriter() { + const chunks = []; + // Create a minimal Writer that records writes. + const writer = { + write(chunk) { + chunks.push(Buffer.from(chunk)); + return Promise.resolve(); + }, + end() { return Promise.resolve(0); }, + fail() {}, + }; + + const writable = toWritable(writer); + + await new Promise((resolve, reject) => { + writable.write('hello', (err) => { + if (err) reject(err); + else resolve(); + }); + }); + + assert.strictEqual(Buffer.concat(chunks).toString(), 'hello'); +} + +// ============================================================================= +// _writev delegates to writer.writev() when available +// ============================================================================= + +async function testWritevDelegation() { + const batches = []; + const writer = { + write(chunk) { + return Promise.resolve(); + }, + writev(chunks) { + batches.push(chunks.map((c) => Buffer.from(c))); + return Promise.resolve(); + }, + writevSync(chunks) { + return false; + }, + end() { return Promise.resolve(0); }, + fail() {}, + }; + + const writable = toWritable(writer); + + // Cork to batch writes, then uncork to trigger _writev + writable.cork(); + writable.write('a'); + writable.write('b'); + writable.write('c'); + writable.uncork(); + + await new Promise((resolve) => writable.end(resolve)); + + // Writev should have been called with the batched chunks + assert.ok(batches.length > 0, 'writev should have been called'); +} + +// ============================================================================= +// _writev not defined when writer lacks writev +// ============================================================================= + +function testNoWritevWithoutWriterWritev() { + const writer = { + write(chunk) { return Promise.resolve(); }, + }; + + const writable = toWritable(writer); + // The _writev should be null (Writable default) when writer lacks writev + assert.strictEqual(writable._writev, null); +} + +// ============================================================================= +// Try-sync-first: writeSync is attempted before write +// ============================================================================= + +async function testWriteSyncFirst() { + let syncCalled = false; + let asyncCalled = false; + + const writer = { + writeSync(chunk) { + syncCalled = true; + return true; // Sync path accepted + }, + write(chunk) { + asyncCalled = true; + return Promise.resolve(); + }, + end() { return Promise.resolve(0); }, + fail() {}, + }; + + const writable = toWritable(writer); + + await new Promise((resolve) => { + writable.write('test', resolve); + }); + + assert.ok(syncCalled, 'writeSync should have been called'); + assert.ok(!asyncCalled, 'write should not have been called'); +} + +// ============================================================================= +// Try-sync-first: falls back to async when writeSync returns false +// ============================================================================= + +async function testWriteSyncFallback() { + let syncCalled = false; + let asyncCalled = false; + + const writer = { + writeSync(chunk) { + syncCalled = true; + return false; // Sync path rejected + }, + write(chunk) { + asyncCalled = true; + return Promise.resolve(); + }, + end() { return Promise.resolve(0); }, + fail() {}, + }; + + const writable = toWritable(writer); + + await new Promise((resolve) => { + writable.write('test', resolve); + }); + + assert.ok(syncCalled, 'writeSync should have been called'); + assert.ok(asyncCalled, 'write should have been called as fallback'); +} + +// ============================================================================= +// Try-sync-first: endSync attempted before end +// ============================================================================= + +async function testEndSyncFirst() { + let endSyncCalled = false; + let endAsyncCalled = false; + + const writer = { + write(chunk) { return Promise.resolve(); }, + endSync() { + endSyncCalled = true; + return 5; // Success, returns byte count + }, + end() { + endAsyncCalled = true; + return Promise.resolve(5); + }, + fail() {}, + }; + + const writable = toWritable(writer); + + await new Promise((resolve) => writable.end(resolve)); + + assert.ok(endSyncCalled, 'endSync should have been called'); + assert.ok(!endAsyncCalled, 'end should not have been called'); +} + +// ============================================================================= +// Try-sync-first: endSync returns -1, falls back to async end +// ============================================================================= + +async function testEndSyncFallback() { + let endSyncCalled = false; + let endAsyncCalled = false; + + const writer = { + write(chunk) { return Promise.resolve(); }, + endSync() { + endSyncCalled = true; + return -1; // Can't complete synchronously + }, + end() { + endAsyncCalled = true; + return Promise.resolve(0); + }, + fail() {}, + }; + + const writable = toWritable(writer); + + await new Promise((resolve) => writable.end(resolve)); + + assert.ok(endSyncCalled, 'endSync should have been called'); + assert.ok(endAsyncCalled, 'end should have been called as fallback'); +} + +// ============================================================================= +// _final delegates to writer.end() +// ============================================================================= + +async function testFinalDelegatesToEnd() { + let endCalled = false; + const writer = { + write(chunk) { return Promise.resolve(); }, + end() { + endCalled = true; + return Promise.resolve(0); + }, + fail() {}, + }; + + const writable = toWritable(writer); + + await new Promise((resolve) => writable.end(resolve)); + + assert.ok(endCalled, 'writer.end() should have been called'); +} + +// ============================================================================= +// _destroy delegates to writer.fail() +// ============================================================================= + +async function testDestroyDelegatesToFail() { + let failReason = null; + const writer = { + write(chunk) { return Promise.resolve(); }, + end() { return Promise.resolve(0); }, + fail(reason) { failReason = reason; }, + }; + + const writable = toWritable(writer); + writable.on('error', () => {}); // Prevent unhandled + + const testErr = new Error('destroy test'); + writable.destroy(testErr); + + // Give a tick for destroy to propagate + await new Promise((resolve) => setTimeout(resolve, 10)); + + assert.strictEqual(failReason, testErr); +} + +// ============================================================================= +// Error from writer.write() propagates to writable +// ============================================================================= + +async function testWriteErrorPropagation() { + const writer = { + write(chunk) { + return Promise.reject(new Error('write failed')); + }, + end() { return Promise.resolve(0); }, + fail() {}, + }; + + const writable = toWritable(writer); + + await assert.rejects(new Promise((resolve, reject) => { + writable.write('data', (err) => { + if (err) reject(err); + else resolve(); + }); + }), { message: 'write failed' }); +} + +// ============================================================================= +// Invalid writer argument throws +// ============================================================================= + +function testInvalidWriterThrows() { + assert.throws( + () => toWritable(null), + { code: 'ERR_INVALID_ARG_TYPE' }, + ); + assert.throws( + () => toWritable({}), + { code: 'ERR_INVALID_ARG_TYPE' }, + ); + assert.throws( + () => toWritable('not a writer'), + { code: 'ERR_INVALID_ARG_TYPE' }, + ); + // Object with write is valid (only write is required). + // This should not throw. + toWritable({ + write() { return Promise.resolve(); }, + }); +} + +// ============================================================================= +// Round-trip: push writer -> fromStreamIter -> write -> read from readable +// ============================================================================= + +async function testRoundTrip() { + const { writer, readable } = push({ backpressure: 'block' }); + const writable = toWritable(writer); + + const data = 'round trip test data'; + writable.write(data); + writable.end(); + + const result = await text(readable); + assert.strictEqual(result, data); +} + +// ============================================================================= +// Multiple sequential writes +// ============================================================================= + +async function testSequentialWrites() { + const { writer, readable } = push({ backpressure: 'block' }); + const writable = toWritable(writer); + + for (let i = 0; i < 10; i++) { + writable.write(`chunk${i}`); + } + writable.end(); + + let expected = ''; + for (let i = 0; i < 10; i++) { + expected += `chunk${i}`; + } + const result = await text(readable); + assert.strictEqual(result, expected); +} + +// ============================================================================= +// Sync callback is deferred via queueMicrotask +// ============================================================================= + +async function testSyncCallbackDeferred() { + let callbackTick = false; + + const writer = { + writeSync(chunk) { + return true; + }, + write(chunk) { + return Promise.resolve(); + }, + end() { return Promise.resolve(0); }, + fail() {}, + }; + + const writable = toWritable(writer); + + const p = new Promise((resolve) => { + writable.write('test', () => { + callbackTick = true; + resolve(); + }); + // Callback should NOT have fired synchronously + assert.strictEqual(callbackTick, false); + }); + + await p; + assert.strictEqual(callbackTick, true); +} + +// ============================================================================= +// Minimal writer: only write() is required +// ============================================================================= + +async function testMinimalWriter() { + const chunks = []; + const writer = { + write(chunk) { + chunks.push(Buffer.from(chunk)); + return Promise.resolve(); + }, + // No end, fail, writeSync, writev, etc. + }; + + const writable = toWritable(writer); + + await new Promise((resolve) => { + writable.write('minimal'); + writable.end(resolve); + }); + + assert.strictEqual(Buffer.concat(chunks).toString(), 'minimal'); +} + +// ============================================================================= +// Destroy without error does not call fail() +// ============================================================================= + +async function testDestroyWithoutError() { + let failCalled = false; + const writer = { + write(chunk) { return Promise.resolve(); }, + fail() { failCalled = true; }, + }; + + const writable = toWritable(writer); + writable.destroy(); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + assert.ok(!failCalled, 'fail should not be called on clean destroy'); +} + +// ============================================================================= +// Destroy with error calls fail() when available +// ============================================================================= + +async function testDestroyWithError() { + let failReason = null; + const writer = { + write(chunk) { return Promise.resolve(); }, + fail(reason) { failReason = reason; }, + }; + + const writable = toWritable(writer); + writable.on('error', () => {}); + + const err = new Error('test'); + writable.destroy(err); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + assert.strictEqual(failReason, err); +} + +// ============================================================================= +// Destroy with error when writer lacks fail() +// ============================================================================= + +async function testDestroyWithoutFail() { + const writer = { + write(chunk) { return Promise.resolve(); }, + // No fail method + }; + + const writable = toWritable(writer); + writable.on('error', () => {}); + + // Should not throw even though writer has no fail() + writable.destroy(new Error('test')); + + await new Promise((resolve) => setTimeout(resolve, 10)); + assert.ok(writable.destroyed); +} + +// ============================================================================= +// Custom highWaterMark option +// ============================================================================= + +function testHighWaterMarkIsMaxSafeInt() { + const writer = { + write(chunk) { return Promise.resolve(); }, + }; + + // HWM is set to MAX_SAFE_INTEGER to disable Writable's internal + // buffering. The underlying Writer manages backpressure directly. + const writable = toWritable(writer); + assert.strictEqual(writable.writableHighWaterMark, Number.MAX_SAFE_INTEGER); +} + +// ============================================================================= +// writeSync throws -- falls back to async +// ============================================================================= + +async function testWriteSyncThrowsFallback() { + let asyncCalled = false; + + const writer = { + writeSync() { + throw new Error('sync broken'); + }, + write(chunk) { + asyncCalled = true; + return Promise.resolve(); + }, + end() { return Promise.resolve(0); }, + fail() {}, + }; + + const writable = toWritable(writer); + + await new Promise((resolve) => { + writable.write('test', resolve); + }); + + assert.ok(asyncCalled, 'async write should be called as fallback'); +} + +// ============================================================================= +// ============================================================================= +// writer.write() throws synchronously -- error propagates to callback +// ============================================================================= + +async function testWriteThrowsSyncPropagation() { + const writer = { + write() { + throw new Error('sync throw from write'); + }, + }; + + const writable = toWritable(writer); + + await assert.rejects(new Promise((resolve, reject) => { + writable.write('data', (err) => { + if (err) reject(err); + else resolve(); + }); + }), { message: 'sync throw from write' }); +} + +// ============================================================================= +// writer.end() throws synchronously -- error propagates to callback +// ============================================================================= + +async function testEndThrowsSyncPropagation() { + const writer = { + write(chunk) { return Promise.resolve(); }, + endSync() { return -1; }, + end() { + throw new Error('sync throw from end'); + }, + }; + + const writable = toWritable(writer); + writable.on('error', () => {}); + + await new Promise((resolve) => { + writable.end(common.mustCall((err) => { + assert.ok(err); + assert.strictEqual(err.message, 'sync throw from end'); + resolve(); + })); + }); +} + +// ============================================================================= +// Run all tests +// ============================================================================= + +testInvalidWriterThrows(); +testNoWritevWithoutWriterWritev(); +testHighWaterMarkIsMaxSafeInt(); + +Promise.all([ + testBasicWrite(), + testWriteDelegatesToWriter(), + testWritevDelegation(), + testWriteSyncFirst(), + testWriteSyncFallback(), + testWriteSyncThrowsFallback(), + testEndSyncFirst(), + testEndSyncFallback(), + testFinalDelegatesToEnd(), + testDestroyDelegatesToFail(), + testDestroyWithoutError(), + testDestroyWithError(), + testDestroyWithoutFail(), + testWriteErrorPropagation(), + testWriteThrowsSyncPropagation(), + testEndThrowsSyncPropagation(), + testRoundTrip(), + testSequentialWrites(), + testSyncCallbackDeferred(), + testMinimalWriter(), +]).then(common.mustCall()); diff --git a/test/parallel/test-stream-iter-writable-interop.js b/test/parallel/test-stream-iter-writable-interop.js new file mode 100644 index 00000000000000..8a2ead0d0ee579 --- /dev/null +++ b/test/parallel/test-stream-iter-writable-interop.js @@ -0,0 +1,667 @@ +// Flags: --experimental-stream-iter +'use strict'; + +// Tests for classic Writable stream interop with the stream/iter API +// via fromWritable(). + +const common = require('../common'); +const assert = require('assert'); +const { Writable } = require('stream'); +const { + from, + fromWritable, + pipeTo, + text, + ondrain, +} = require('stream/iter'); + +// ============================================================================= +// fromWritable() is exported from stream/iter +// ============================================================================= + +function testFunctionExists() { + assert.strictEqual(typeof fromWritable, 'function'); +} + +// ============================================================================= +// Default policy is strict +// ============================================================================= + +async function testDefaultIsStrict() { + const writable = new Writable({ + highWaterMark: 1024, + write(chunk, encoding, cb) { cb(); }, + }); + + const writer = fromWritable(writable); + // Should work fine when buffer has room + await writer.write('hello'); + await writer.end(); +} + +// ============================================================================= +// Basic write: pipeTo through the adapter (block policy for pipeTo compat) +// ============================================================================= + +async function testBasicWrite() { + const chunks = []; + const writable = new Writable({ + write(chunk, encoding, cb) { + chunks.push(Buffer.from(chunk)); + cb(); + }, + }); + + const writer = fromWritable(writable, { backpressure: 'block' }); + await pipeTo(from('hello world'), writer); + + assert.strictEqual(Buffer.concat(chunks).toString(), 'hello world'); +} + +// ============================================================================= +// write() resolves when no backpressure (strict) +// ============================================================================= + +async function testWriteNoDrain() { + const chunks = []; + const writable = new Writable({ + highWaterMark: 1024, + write(chunk, encoding, cb) { + chunks.push(Buffer.from(chunk)); + cb(); + }, + }); + + const writer = fromWritable(writable); + await writer.write('hello'); + await writer.write(' world'); + await writer.end(); + + assert.strictEqual(Buffer.concat(chunks).toString(), 'hello world'); +} + +// ============================================================================= +// block: write() waits for drain when backpressure is active +// ============================================================================= + +async function testBlockWaitsForDrain() { + const chunks = []; + const writable = new Writable({ + highWaterMark: 1, // Very small buffer + write(chunk, encoding, cb) { + chunks.push(Buffer.from(chunk)); + // Delay callback to simulate slow consumer + setTimeout(cb, 10); + }, + }); + + const writer = fromWritable(writable, { backpressure: 'block' }); + + await writer.write('a'); + await writer.write('b'); + await writer.write('c'); + await writer.end(); + + assert.strictEqual(Buffer.concat(chunks).toString(), 'abc'); +} + +// ============================================================================= +// block: stream error rejects pending write +// ============================================================================= + +async function testBlockErrorRejectsPendingWrite() { + const writable = new Writable({ + highWaterMark: 1, + write(chunk, enc, cb) { + // Never call cb -- simulate stuck write + }, + }); + + const writer = fromWritable(writable, { backpressure: 'block' }); + + // First write fills the buffer, waits for drain + const writePromise = writer.write('data that will block'); + + // Destroy with error while write is pending + writable.destroy(new Error('stream broke')); + + await assert.rejects(writePromise, { message: 'stream broke' }); +} + +// ============================================================================= +// strict: rejects when buffer is full +// ============================================================================= + +async function testStrictRejectsWhenFull() { + const writable = new Writable({ + highWaterMark: 5, + write(chunk, enc, cb) { + // Never call cb -- data stays buffered + }, + }); + + const writer = fromWritable(writable); + + // First write fills the buffer (5 bytes = hwm) + await writer.write('12345'); + + // Second write should reject -- buffer is full + await assert.rejects( + writer.write('more'), + { code: 'ERR_INVALID_STATE' }, + ); +} + +// ============================================================================= +// strict: writev rejects when buffer is full +// ============================================================================= + +async function testStrictWritevRejectsWhenFull() { + const writable = new Writable({ + highWaterMark: 5, + write(chunk, enc, cb) { + // Never call cb + }, + }); + + const writer = fromWritable(writable); + + // Fill buffer + await writer.write('12345'); + + // Writev should reject entire batch + await assert.rejects( + writer.writev([ + new TextEncoder().encode('a'), + new TextEncoder().encode('b'), + ]), + { code: 'ERR_INVALID_STATE' }, + ); +} + +// ============================================================================= +// drop-newest: silently discards when buffer is full +// ============================================================================= + +async function testDropNewestDiscards() { + const chunks = []; + const writable = new Writable({ + highWaterMark: 5, + write(chunk, enc, cb) { + chunks.push(Buffer.from(chunk)); + // Never call cb -- data stays buffered + }, + }); + + const writer = fromWritable(writable, { backpressure: 'drop-newest' }); + + // First write fills the buffer + await writer.write('12345'); + + // Second write should be silently discarded (no reject, no block) + await writer.write('dropped'); + + // Only the first chunk was actually written to the writable + assert.strictEqual(chunks.length, 1); + assert.strictEqual(chunks[0].toString(), '12345'); +} + +// ============================================================================= +// drop-newest: writev discards entire batch when full +// ============================================================================= + +async function testDropNewestWritevDiscards() { + const chunks = []; + const writable = new Writable({ + highWaterMark: 5, + write(chunk, enc, cb) { + chunks.push(Buffer.from(chunk)); + // Never call cb + }, + }); + + const writer = fromWritable(writable, { backpressure: 'drop-newest' }); + + // Fill buffer + await writer.write('12345'); + + // Writev should discard entire batch + await writer.writev([ + new TextEncoder().encode('a'), + new TextEncoder().encode('b'), + ]); + + assert.strictEqual(chunks.length, 1); +} + +// ============================================================================= +// drop-newest: still counts bytes from dropped writes +// ============================================================================= + +async function testDropNewestCountsBytes() { + const writable = new Writable({ + highWaterMark: 5, + write(chunk, enc, cb) { + // Never call cb + }, + }); + + const writer = fromWritable(writable, { backpressure: 'drop-newest' }); + + await writer.write('12345'); // 5 bytes, accepted + await writer.write('67890'); // 5 bytes, dropped + + // desiredSize should be 0 (buffer is full) + assert.strictEqual(writer.desiredSize, 0); +} + +// ============================================================================= +// drop-oldest: throws on construction +// ============================================================================= + +function testDropOldestThrows() { + const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); + assert.throws( + () => fromWritable(writable, { backpressure: 'drop-oldest' }), + { code: 'ERR_INVALID_ARG_VALUE' }, + ); +} + +// ============================================================================= +// Invalid backpressure value throws +// ============================================================================= + +function testInvalidBackpressureThrows() { + const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); + assert.throws( + () => fromWritable(writable, { backpressure: 'invalid' }), + { code: 'ERR_INVALID_ARG_VALUE' }, + ); +} + +// ============================================================================= +// writev() corks and uncorks (block policy) +// ============================================================================= + +async function testWritev() { + const chunks = []; + const writable = new Writable({ + highWaterMark: 1024, + write(chunk, encoding, cb) { + chunks.push(Buffer.from(chunk)); + cb(); + }, + writev(entries, cb) { + for (const { chunk } of entries) { + chunks.push(Buffer.from(chunk)); + } + cb(); + }, + }); + + const writer = fromWritable(writable, { backpressure: 'block' }); + await writer.writev([ + new TextEncoder().encode('hello'), + new TextEncoder().encode(' '), + new TextEncoder().encode('world'), + ]); + await writer.end(); + + assert.strictEqual(Buffer.concat(chunks).toString(), 'hello world'); +} + +// ============================================================================= +// writeSync / writevSync always return false +// ============================================================================= + +function testSyncMethodsReturnFalse() { + const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); + const writer = fromWritable(writable); + + assert.strictEqual(writer.writeSync(new Uint8Array(1)), false); + assert.strictEqual(writer.writevSync([new Uint8Array(1)]), false); +} + +// ============================================================================= +// endSync returns -1 +// ============================================================================= + +function testEndSyncReturnsNegativeOne() { + const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); + const writer = fromWritable(writable); + + assert.strictEqual(writer.endSync(), -1); +} + +// ============================================================================= +// end() resolves with total bytes written +// ============================================================================= + +async function testEndReturnsByteCount() { + const writable = new Writable({ + write(chunk, encoding, cb) { cb(); }, + }); + + const writer = fromWritable(writable); + await writer.write('hello'); // 5 bytes + await writer.write(' world'); // 6 bytes + const total = await writer.end(); + + assert.strictEqual(total, 11); +} + +// ============================================================================= +// fail() destroys the writable +// ============================================================================= + +async function testFail() { + const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); + writable.on('error', () => {}); // Prevent unhandled error + const writer = fromWritable(writable); + + writer.fail(new Error('test fail')); + + assert.ok(writable.destroyed); +} + +// ============================================================================= +// desiredSize reflects buffer state +// ============================================================================= + +function testDesiredSize() { + const writable = new Writable({ + highWaterMark: 100, + write(chunk, enc, cb) { + // Don't call cb - keeps data buffered + }, + }); + + const writer = fromWritable(writable); + assert.strictEqual(writer.desiredSize, 100); +} + +// ============================================================================= +// desiredSize is null when destroyed +// ============================================================================= + +function testDesiredSizeNull() { + const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); + const writer = fromWritable(writable); + + writable.destroy(); + assert.strictEqual(writer.desiredSize, null); +} + +// ============================================================================= +// drainableProtocol: resolves immediately when no backpressure +// ============================================================================= + +async function testDrainableNoPressure() { + const writable = new Writable({ + highWaterMark: 1024, + write(chunk, enc, cb) { cb(); }, + }); + + const writer = fromWritable(writable); + const result = await ondrain(writer); + assert.strictEqual(result, true); +} + +// ============================================================================= +// drainableProtocol: returns null when destroyed +// ============================================================================= + +function testDrainableNull() { + const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); + const writer = fromWritable(writable); + + writable.destroy(); + assert.strictEqual(ondrain(writer), null); +} + +// ============================================================================= +// Error propagation: write after end rejects +// ============================================================================= + +async function testWriteAfterEnd() { + const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); + const writer = fromWritable(writable); + + await writer.end(); + + await assert.rejects( + writer.write('should fail'), + { code: 'ERR_STREAM_WRITE_AFTER_END' }, + ); +} + +// ============================================================================= +// Multiple sequential writes +// ============================================================================= + +async function testSequentialWrites() { + const chunks = []; + const writable = new Writable({ + write(chunk, encoding, cb) { + chunks.push(Buffer.from(chunk)); + cb(); + }, + }); + + const writer = fromWritable(writable); + + for (let i = 0; i < 10; i++) { + await writer.write(`chunk${i}`); + } + await writer.end(); + + let expected = ''; + for (let i = 0; i < 10; i++) { + expected += `chunk${i}`; + } + assert.strictEqual(Buffer.concat(chunks).toString(), expected); +} + +// ============================================================================= +// pipeTo with compression transform into writable (block policy) +// ============================================================================= + +async function testPipeToWithTransform() { + const { + compressGzip, + decompressGzip, + } = require('zlib/iter'); + const { pull } = require('stream/iter'); + + const compressed = []; + const writable = new Writable({ + write(chunk, encoding, cb) { + compressed.push(Buffer.from(chunk)); + cb(); + }, + }); + + const writer = fromWritable(writable, { backpressure: 'block' }); + await pipeTo(from('hello via transform'), compressGzip(), writer); + + const decompressed = await text( + pull(from(Buffer.concat(compressed)), decompressGzip()), + ); + assert.strictEqual(decompressed, 'hello via transform'); +} + +// ============================================================================= +// Dispose support +// ============================================================================= + +async function testDispose() { + const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); + const writer = fromWritable(writable); + + writer[Symbol.dispose](); + assert.ok(writable.destroyed); +} + +async function testAsyncDispose() { + const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); + const writer = fromWritable(writable); + + await writer[Symbol.asyncDispose](); + assert.ok(writable.destroyed); +} + +// ============================================================================= +// write() validates chunk type +// ============================================================================= + +async function testWriteInvalidChunkType() { + const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); + const writer = fromWritable(writable); + + await assert.rejects( + writer.write(42), + { code: 'ERR_INVALID_ARG_TYPE' }, + ); + await assert.rejects( + writer.write(null), + { code: 'ERR_INVALID_ARG_TYPE' }, + ); + await assert.rejects( + writer.write({}), + { code: 'ERR_INVALID_ARG_TYPE' }, + ); +} + +// ============================================================================= +// writev() validates chunks is an array +// ============================================================================= + +function testWritevInvalidChunksType() { + const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); + const writer = fromWritable(writable); + + assert.throws( + () => writer.writev('not an array'), + { code: 'ERR_INVALID_ARG_TYPE' }, + ); + assert.throws( + () => writer.writev(42), + { code: 'ERR_INVALID_ARG_TYPE' }, + ); +} + +// ============================================================================= +// Cached writer: second call returns same instance +// ============================================================================= + +function testCachedWriter() { + const writable = new Writable({ write(chunk, enc, cb) { cb(); } }); + const writer1 = fromWritable(writable); + const writer2 = fromWritable(writable); + + assert.strictEqual(writer1, writer2); +} + +// ============================================================================= +// fail() rejects pending block waiters +// ============================================================================= + +async function testFailRejectsPendingWaiters() { + const writable = new Writable({ + highWaterMark: 1, + write(chunk, enc, cb) { + // Never call cb -- stuck + }, + }); + writable.on('error', () => {}); // Prevent unhandled error + + const writer = fromWritable(writable, { backpressure: 'block' }); + + // This write will block on drain + const writePromise = writer.write('blocked data'); + + // fail() should reject the pending waiter, not orphan it + writer.fail(new Error('fail reason')); + + await assert.rejects(writePromise, { message: 'fail reason' }); +} + +// ============================================================================= +// dispose rejects pending block waiters +// ============================================================================= + +async function testDisposeRejectsPendingWaiters() { + const writable = new Writable({ + highWaterMark: 1, + write(chunk, enc, cb) { + // Never call cb -- stuck + }, + }); + + const writer = fromWritable(writable, { backpressure: 'block' }); + + // This write will block on drain + const writePromise = writer.write('blocked data'); + + writer[Symbol.dispose](); + + await assert.rejects(writePromise, { name: 'AbortError' }); +} + +// ============================================================================= +// Run all tests +// ============================================================================= + +testFunctionExists(); +testSyncMethodsReturnFalse(); +// ============================================================================= +// Object-mode Writable throws +// ============================================================================= + +function testObjectModeThrows() { + const writable = new Writable({ + objectMode: true, + write(chunk, enc, cb) { cb(); }, + }); + assert.throws( + () => fromWritable(writable), + { code: 'ERR_INVALID_STATE' }, + ); +} + +testFunctionExists(); +testSyncMethodsReturnFalse(); +testEndSyncReturnsNegativeOne(); +testDesiredSize(); +testDesiredSizeNull(); +testDrainableNull(); +testDropOldestThrows(); +testInvalidBackpressureThrows(); +testWritevInvalidChunksType(); +testCachedWriter(); +testObjectModeThrows(); + +Promise.all([ + testDefaultIsStrict(), + testBasicWrite(), + testWriteNoDrain(), + testBlockWaitsForDrain(), + testBlockErrorRejectsPendingWrite(), + testStrictRejectsWhenFull(), + testStrictWritevRejectsWhenFull(), + testDropNewestDiscards(), + testDropNewestWritevDiscards(), + testDropNewestCountsBytes(), + testWritev(), + testEndReturnsByteCount(), + testFail(), + testDrainableNoPressure(), + testWriteAfterEnd(), + testSequentialWrites(), + testPipeToWithTransform(), + testDispose(), + testAsyncDispose(), + testWriteInvalidChunkType(), + testFailRejectsPendingWaiters(), + testDisposeRejectsPendingWaiters(), +]).then(common.mustCall());