Skip to content

Commit 0cd35f8

Browse files
committed
stream: apply more stream/iter cleanups
1 parent ef1f82b commit 0cd35f8

2 files changed

Lines changed: 65 additions & 31 deletions

File tree

lib/internal/streams/iter/from.js

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -555,12 +555,7 @@ function from(input) {
555555
);
556556
}
557557

558-
return {
559-
__proto__: null,
560-
async *[SymbolAsyncIterator]() {
561-
yield* normalizeAsyncSource(input);
562-
},
563-
};
558+
return normalizeAsyncSource(input);
564559
}
565560

566561
// =============================================================================

lib/internal/streams/iter/pull.js

Lines changed: 64 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const {
1010
ArrayBufferIsView,
1111
ArrayPrototypePush,
1212
ArrayPrototypeSlice,
13+
PromisePrototypeThen,
1314
SymbolAsyncIterator,
1415
SymbolIterator,
1516
TypedArrayPrototypeGetByteLength,
@@ -350,7 +351,21 @@ function* applyFusedStatelessSyncTransforms(source, run) {
350351
current = result;
351352
}
352353
if (current === null) continue;
353-
yield* processTransformResultSync(current);
354+
// Inline normalization with Uint8Array[] batch as the fast path,
355+
// matching the async pipeline's check order.
356+
if (isUint8ArrayBatch(current)) {
357+
if (current.length > 0) yield current;
358+
} else if (isUint8Array(current)) {
359+
yield [current];
360+
} else if (typeof current === 'string') {
361+
yield [toUint8Array(current)];
362+
} else if (isAnyArrayBuffer(current)) {
363+
yield [new Uint8Array(current)];
364+
} else if (ArrayBufferIsView(current)) {
365+
yield [primitiveToUint8Array(current)];
366+
} else {
367+
yield* processTransformResultSync(current);
368+
}
354369
}
355370
// Flush
356371
let current = null;
@@ -394,8 +409,7 @@ function* applyStatefulSyncTransform(source, transform) {
394409
* @yields {Uint8Array[]}
395410
*/
396411
function* createSyncPipeline(source, transforms) {
397-
// Normalize source via fromSync()
398-
let current = fromSync(source);
412+
let current = source;
399413

400414
// Apply transforms - fuse consecutive stateless transforms into a single
401415
// generator layer to avoid unnecessary generator ticks.
@@ -553,8 +567,7 @@ async function* createAsyncPipeline(source, transforms, signal) {
553567
// Check for abort
554568
signal?.throwIfAborted();
555569

556-
// Normalize source via from()
557-
const normalized = from(source);
570+
const normalized = source;
558571

559572
// Fast path: no transforms, just yield normalized source directly
560573
if (transforms.length === 0) {
@@ -661,7 +674,7 @@ function pullSync(source, ...transforms) {
661674
return {
662675
__proto__: null,
663676
*[SymbolIterator]() {
664-
yield* createSyncPipeline(source, transforms);
677+
yield* createSyncPipeline(fromSync(source), transforms);
665678
},
666679
};
667680
}
@@ -693,7 +706,7 @@ function pull(source, ...args) {
693706
return {
694707
__proto__: null,
695708
async *[SymbolAsyncIterator]() {
696-
yield* createAsyncPipeline(source, transforms, signal);
709+
yield* createAsyncPipeline(from(source), transforms, signal);
697710
},
698711
};
699712
}
@@ -792,28 +805,50 @@ async function pipeTo(source, ...args) {
792805
const hasWriteSync = typeof writer.writeSync === 'function';
793806
const hasWritevSync = typeof writer.writevSync === 'function';
794807
const hasEndSync = typeof writer.endSync === 'function';
808+
// Async fallback for writeBatch when sync write fails partway through.
809+
// Continues writing from batch[startIndex] using async write().
810+
async function writeBatchAsyncFallback(batch, startIndex) {
811+
for (let i = startIndex; i < batch.length; i++) {
812+
const chunk = batch[i];
813+
if (hasWriteSync && writer.writeSync(chunk)) {
814+
// Sync retry succeeded
815+
} else {
816+
const result = writer.write(
817+
chunk, signal ? { __proto__: null, signal } : undefined);
818+
if (result !== undefined) {
819+
await result;
820+
}
821+
}
822+
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
823+
}
824+
}
825+
795826
// Write a batch using try-fallback: sync first, async if needed.
796-
async function writeBatch(batch) {
827+
// Returns undefined on sync success, or a Promise when async fallback
828+
// is required. Callers must check: const p = writeBatch(b); if (p) await p;
829+
function writeBatch(batch) {
797830
if (hasWritev && batch.length > 1) {
798831
if (!hasWritevSync || !writer.writevSync(batch)) {
799-
await writer.writev(batch, signal ? { __proto__: null, signal } :
800-
undefined);
832+
const opts = signal ? { __proto__: null, signal } : undefined;
833+
return PromisePrototypeThen(writer.writev(batch, opts), () => {
834+
for (let i = 0; i < batch.length; i++) {
835+
totalBytes += TypedArrayPrototypeGetByteLength(batch[i]);
836+
}
837+
});
801838
}
802839
for (let i = 0; i < batch.length; i++) {
803840
totalBytes += TypedArrayPrototypeGetByteLength(batch[i]);
804841
}
805-
} else {
806-
for (let i = 0; i < batch.length; i++) {
807-
const chunk = batch[i];
808-
if (!hasWriteSync || !writer.writeSync(chunk)) {
809-
const result = writer.write(
810-
chunk, signal ? { __proto__: null, signal } : undefined);
811-
if (result !== undefined) {
812-
await result;
813-
}
814-
}
815-
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
842+
return;
843+
}
844+
for (let i = 0; i < batch.length; i++) {
845+
const chunk = batch[i];
846+
if (!hasWriteSync || !writer.writeSync(chunk)) {
847+
// Sync path failed at index i - fall back to async for the rest.
848+
// Count bytes for chunks already written synchronously (0..i-1).
849+
return writeBatchAsyncFallback(batch, i);
816850
}
851+
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
817852
}
818853
}
819854

@@ -823,11 +858,13 @@ async function pipeTo(source, ...args) {
823858
if (signal) {
824859
for await (const batch of normalized) {
825860
signal.throwIfAborted();
826-
await writeBatch(batch);
861+
const p = writeBatch(batch);
862+
if (p) await p;
827863
}
828864
} else {
829865
for await (const batch of normalized) {
830-
await writeBatch(batch);
866+
const p = writeBatch(batch);
867+
if (p) await p;
831868
}
832869
}
833870
} else {
@@ -836,11 +873,13 @@ async function pipeTo(source, ...args) {
836873
if (signal) {
837874
for await (const batch of pipeline) {
838875
signal.throwIfAborted();
839-
await writeBatch(batch);
876+
const p = writeBatch(batch);
877+
if (p) await p;
840878
}
841879
} else {
842880
for await (const batch of pipeline) {
843-
await writeBatch(batch);
881+
const p = writeBatch(batch);
882+
if (p) await p;
844883
}
845884
}
846885
}

0 commit comments

Comments
 (0)