diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index beaf942b4b1288..1e26392995e325 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -239,7 +239,17 @@ function fromAsyncGen(fn) { _resolve({ done: true, cb }); }, destroy(err, cb) { - ac.abort(); + ac.abort(err); + + // If the source async iterator is waiting for the next write/final + // signal, unblock it so the readable side can observe the abort and + // finish destroying. + if (resolve !== null) { + const _resolve = resolve; + resolve = null; + _resolve({ done: true, cb() {} }); + } + cb(err); }, }; diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index fad5e68c10d8c4..6a943f02657924 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -8,9 +8,12 @@ const { const { Buffer } = require('buffer'); const { - ERR_INVALID_ARG_TYPE, - ERR_STREAM_NULL_VALUES, -} = require('internal/errors').codes; + aggregateTwoErrors, + codes: { + ERR_INVALID_ARG_TYPE, + ERR_STREAM_NULL_VALUES, + }, +} = require('internal/errors'); function from(Readable, iterable, opts) { let iterator; @@ -43,6 +46,7 @@ function from(Readable, iterable, opts) { // TODO(ronag): What options should be allowed? ...opts, }); + const originalDestroy = readable._destroy; // Flag to protect against _read // being called before last iteration completion. @@ -64,11 +68,18 @@ function from(Readable, iterable, opts) { }; readable._destroy = function(error, cb) { - PromisePrototypeThen( - close(error), - () => process.nextTick(cb, error), // nextTick is here in case cb throws - (e) => process.nextTick(cb, e || error), - ); + originalDestroy.call(this, error, (destroyError) => { + const combinedError = destroyError || error; + PromisePrototypeThen( + close(combinedError), + // nextTick is here in case cb throws + () => process.nextTick(cb, combinedError), + (closeError) => process.nextTick( + cb, + aggregateTwoErrors(combinedError, closeError), + ), + ); + }); }; async function close(error) { diff --git a/test/parallel/test-stream-readable-compose.js b/test/parallel/test-stream-readable-compose.js index cacdfae1c034d6..ca06c9f7735cb0 100644 --- a/test/parallel/test-stream-readable-compose.js +++ b/test/parallel/test-stream-readable-compose.js @@ -116,6 +116,26 @@ const assert = require('assert'); ).then(common.mustCall()); } +{ + // Errors from nested `.compose()` calls should propagate instead of hanging. + const stream = Readable.from(['hello']) + .compose(async function *(source) { + for await (const chunk of source) { + throw new Error(`boom: ${chunk}`); + } + }) + .compose(async function *(source) { + for await (const chunk of source) { + yield chunk; + } + }); + + assert.rejects( + stream.toArray(), + /boom: hello/, + ).then(common.mustCall()); +} + { // AbortSignal const ac = new AbortController();