Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion lib/internal/streams/duplexify.js
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,17 @@ function fromAsyncGen(fn) {
_resolve({ done: true, cb });
},
destroy(err, cb) {
ac.abort();
ac.abort(err);

// If the source async iterator is waiting for the next write/final
// signal, unblock it so the readable side can observe the abort and
// finish destroying.
if (resolve !== null) {
const _resolve = resolve;
resolve = null;
_resolve({ done: true, cb() {} });
Copy link
Copy Markdown
Contributor

@aduh95 aduh95 Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_resolve({ done: true, cb() {} });
_resolve({ __proto__: null, done: true, cb() {} });

}

cb(err);
},
};
Expand Down
27 changes: 19 additions & 8 deletions lib/internal/streams/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ const {
const { Buffer } = require('buffer');

const {
ERR_INVALID_ARG_TYPE,
ERR_STREAM_NULL_VALUES,
} = require('internal/errors').codes;
aggregateTwoErrors,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_STREAM_NULL_VALUES,
},
} = require('internal/errors');

function from(Readable, iterable, opts) {
let iterator;
Expand Down Expand Up @@ -43,6 +46,7 @@ function from(Readable, iterable, opts) {
// TODO(ronag): What options should be allowed?
...opts,
});
const originalDestroy = readable._destroy;

// Flag to protect against _read
// being called before last iteration completion.
Expand All @@ -64,11 +68,18 @@ function from(Readable, iterable, opts) {
};

readable._destroy = function(error, cb) {
PromisePrototypeThen(
close(error),
() => process.nextTick(cb, error), // nextTick is here in case cb throws
(e) => process.nextTick(cb, e || error),
);
originalDestroy.call(this, error, (destroyError) => {
const combinedError = destroyError || error;
PromisePrototypeThen(
close(combinedError),
// nextTick is here in case cb throws
() => process.nextTick(cb, combinedError),
(closeError) => process.nextTick(
cb,
aggregateTwoErrors(combinedError, closeError),
),
);
});
};

async function close(error) {
Expand Down
20 changes: 20 additions & 0 deletions test/parallel/test-stream-readable-compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@
).then(common.mustCall());
}

{
// Errors from nested `.compose()` calls should propagate instead of hanging.
const stream = Readable.from(['hello'])
.compose(async function *(source) {

Check failure on line 122 in test/parallel/test-stream-readable-compose.js

View workflow job for this annotation

GitHub Actions / lint-js-and-md

This generator function does not have 'yield'
for await (const chunk of source) {
throw new Error(`boom: ${chunk}`);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fix the linter error

Suggested change
throw new Error(`boom: ${chunk}`);
yield chunk;
throw new Error(`boom: ${chunk}`);

}
})
.compose(async function *(source) {
for await (const chunk of source) {
yield chunk;
}
Comment on lines +128 to +130
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for await (const chunk of source) {
yield chunk;
}
yield* source;

});

assert.rejects(
stream.toArray(),
/boom: hello/,
).then(common.mustCall());
}

{
// AbortSignal
const ac = new AbortController();
Expand Down
Loading