Skip to content

stream: stream/iter node.js streams interop#62469

Open
jasnell wants to merge 4 commits intonodejs:mainfrom
jasnell:jasnell/new-streams-classic-interop
Open

stream: stream/iter node.js streams interop#62469
jasnell wants to merge 4 commits intonodejs:mainfrom
jasnell:jasnell/new-streams-classic-interop

Conversation

@jasnell
Copy link
Copy Markdown
Member

@jasnell jasnell commented Mar 28, 2026

Implementing prototype integration points for experimental stream/iter and node.js stream.Readable/stream.Writable

Five utility functions are exported from stream/iter to bridge between classic Node.js streams and the iterable streams API. These are available as require('node:stream/iter').fromReadable, etc.

From classic to stream/iter

fromReadable(readable) — Converts a classic stream.Readable (or duck-typed object with read() and on()) into an AsyncIterable<Uint8Array[]> suitable for use with from(), pull(), text(), etc.

  • If the object implements the toAsyncStreamable protocol (as stream.Readable does via its prototype), that path is used.
  • Otherwise, duck-types on read() + EventEmitter and wraps with the same batched async iterator logic.
  • Object-mode and encoded Readable streams are automatically normalized to Uint8Array.
  • Results are cached per instance via WeakMap.
    fromWritable(writable[, options]) — Converts a classic stream.Writable (or duck-typed object with write() and on()) into a stream/iter Writer adapter suitable for use with pipeTo().
  • Supports backpressure policies: 'strict' (default — rejects on full buffer), 'block' (waits for drain), 'drop-newest' (silently discards). 'drop-oldest' is rejected because the Writable's internal buffer has no batch boundaries — a writev() fans out into individual entries, so evicting oldest entries could partially tear apart an earlier atomic batch.
  • Since classic Writable writes are fundamentally async, writeSync/writevSync always return false and endSync returns -1.
  • Falls back to sensible defaults for duck-typed streams missing writableHighWaterMark, writableLength, etc.
  • Object-mode writables are rejected (Writer interface is bytes-only).
  • Backpressure uses a single persistent 'drain'/'error' listener pair (not per-write listeners) with a waiters list that is swapped on resolution for O(n) cleanup.
  • Results are cached per instance via WeakMap.
  • Listeners are cleaned up on end(), fail(), and dispose.

From stream/iter to classic

toReadable(source[, options]) — Creates a byte-mode stream.Readable from an AsyncIterable<Uint8Array[]>. Each batch's chunks are pushed individually. Supports highWaterMark and signal options. Uses an async pump with PromiseWithResolvers-based backpressure.

toReadableSync(source[, options]) — Same but for synchronous Iterable<Uint8Array[]>. The _read() method pulls from the iterator directly.

toWritable(writer) — Creates a stream.Writable backed by a stream/iter Writer. Uses the try-sync-first pattern: _write attempts writeSync before falling back to await write(), _writev attempts writevSync before await writev(), _final attempts endSync before await end(). Sync-success callbacks are deferred via queueMicrotask to preserve Writable's async callback contract. The Writable's highWaterMark is set to Number.MAX_SAFE_INTEGER to disable its internal buffering — the underlying Writer manages backpressure directly. Only write() is required on the Writer; end(), fail(), writev(), and all sync variants are optional.

Protocol: Readable.prototype[Symbol.for('Stream.toAsyncStreamable')]

Remains on the Readable prototype so that from(readable) discovers it automatically. Delegates to createBatchedAsyncIterator (shared with fromReadable's duck-type path) to avoid code duplication. The protocol method calls the shared helper directly — it does NOT call fromReadable(), which would create infinite recursion since fromReadable checks for the protocol.

Signed-off-by: James M Snell jasnell@gmail.com
Assisted-by: Opencode/Opus 4.6

@jasnell jasnell requested review from mcollina and ronag March 28, 2026 03:23
@jasnell jasnell added stream Issues and PRs related to the stream subsystem. experimental Issues and PRs related to experimental features. labels Mar 28, 2026
@nodejs-github-bot
Copy link
Copy Markdown
Collaborator

Review requested:

  • @nodejs/streams

@nodejs-github-bot nodejs-github-bot added lib / src Issues and PRs related to general changes in the lib or src directory. needs-ci PRs that need a full CI run. labels Mar 28, 2026
@jasnell jasnell changed the title Jasnell/new streams classic interop stream: stream/iter node.js streams interop Mar 28, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Mar 28, 2026

Codecov Report

❌ Patch coverage is 90.05291% with 94 lines in your changes missing coverage. Please review.
✅ Project coverage is 89.70%. Comparing base (b411f90) to head (52280cd).

Files with missing lines Patch % Lines
lib/internal/streams/iter/classic.js 89.33% 84 Missing and 6 partials ⚠️
lib/internal/streams/iter/from.js 78.94% 4 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #62469      +/-   ##
==========================================
- Coverage   89.71%   89.70%   -0.01%     
==========================================
  Files         695      696       +1     
  Lines      214434   215365     +931     
  Branches    41062    41233     +171     
==========================================
+ Hits       192371   193197     +826     
- Misses      14124    14224     +100     
- Partials     7939     7944       +5     
Files with missing lines Coverage Δ
lib/internal/errors.js 97.62% <100.00%> (+<0.01%) ⬆️
lib/internal/streams/iter/pull.js 84.91% <100.00%> (ø)
lib/internal/streams/iter/transform.js 95.78% <100.00%> (ø)
lib/internal/streams/iter/types.js 100.00% <100.00%> (ø)
lib/internal/streams/readable.js 97.22% <100.00%> (+0.06%) ⬆️
lib/internal/streams/writable.js 96.63% <ø> (-0.01%) ⬇️
lib/stream/iter.js 100.00% <100.00%> (ø)
lib/internal/streams/iter/from.js 88.72% <78.94%> (-0.37%) ⬇️
lib/internal/streams/iter/classic.js 89.33% <89.33%> (ø)

... and 27 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@nodejs-github-bot

This comment was marked as outdated.

@nodejs-github-bot

This comment was marked as outdated.

@nodejs-github-bot

This comment was marked as outdated.

Copy link
Copy Markdown
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer if these utilities to be exposed only in the new iter module as a function utilities instead of attaching them to the node stream
prototype

jasnell added 3 commits April 2, 2026 18:54
Signed-off-by: James M Snell <jasnell@gmail.com>
Assisted-by: Opencode/Opus 4.6
Signed-off-by: James M Snell <jasnell@gmail.com>
Assisted-by: Opencode/Opus 4.6
Signed-off-by: James M Snell <jasnell@gmail.com>
Assisted-by: Opencode/Opus 4.6
@jasnell jasnell force-pushed the jasnell/new-streams-classic-interop branch from e16afd4 to fb49804 Compare April 3, 2026 01:54
@jasnell jasnell requested review from guybedford and mcollina April 3, 2026 02:55
@jasnell
Copy link
Copy Markdown
Member Author

jasnell commented Apr 3, 2026

@mcollina @guybedford please take another look

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

experimental Issues and PRs related to experimental features. lib / src Issues and PRs related to general changes in the lib or src directory. needs-ci PRs that need a full CI run. stream Issues and PRs related to the stream subsystem.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants