From 4fb963743b8224b9d9c293ffd6ea8670bfb38419 Mon Sep 17 00:00:00 2001 From: Aviv Keller Date: Mon, 9 Mar 2026 16:05:14 -0400 Subject: [PATCH] feat(worker): simplify input passing --- docs/generators.md | 4 ++-- src/generators/ast-js/generate.mjs | 2 +- src/generators/ast/generate.mjs | 2 +- src/generators/jsx-ast/generate.mjs | 2 +- src/generators/legacy-html/generate.mjs | 2 +- src/generators/legacy-json/generate.mjs | 2 +- src/generators/metadata/generate.mjs | 2 +- src/generators/types.d.ts | 2 -- src/threading/__tests__/parallel.test.mjs | 8 ++++---- src/threading/parallel.mjs | 15 ++++----------- 10 files changed, 16 insertions(+), 25 deletions(-) diff --git a/docs/generators.md b/docs/generators.md index d97ad495..b7b47041 100644 --- a/docs/generators.md +++ b/docs/generators.md @@ -241,7 +241,7 @@ export async function* generate(input, worker) { }; // Stream chunks as they complete - for await (const chunkResult of worker.stream(input, input, deps)) { + for await (const chunkResult of worker.stream(input, deps)) { // Process chunk result if needed yield chunkResult; } @@ -316,7 +316,7 @@ export async function processChunk(fullInput, itemIndices, deps) { */ export async function* generate(input, worker) { // Stream results as workers complete chunks - for await (const chunkResult of worker.stream(input, input, {})) { + for await (const chunkResult of worker.stream(input, {})) { // Yield immediately - downstream can start processing yield chunkResult; } diff --git a/src/generators/ast-js/generate.mjs b/src/generators/ast-js/generate.mjs index 4cbb9f68..303e5ec0 100644 --- a/src/generators/ast-js/generate.mjs +++ b/src/generators/ast-js/generate.mjs @@ -50,7 +50,7 @@ export async function* generate(_, worker) { // Parse the Javascript sources into ASTs in parallel using worker threads // source is both the items list and the fullInput since we use sliceInput - for await (const chunkResult of worker.stream(files, files)) { + for await (const chunkResult of worker.stream(files)) { yield chunkResult; } } diff --git a/src/generators/ast/generate.mjs b/src/generators/ast/generate.mjs index d5574019..dcb3d541 100644 --- a/src/generators/ast/generate.mjs +++ b/src/generators/ast/generate.mjs @@ -52,7 +52,7 @@ export async function* generate(_, worker) { ); // Parse markdown files in parallel using worker threads - for await (const chunkResult of worker.stream(files, files)) { + for await (const chunkResult of worker.stream(files)) { yield chunkResult; } } diff --git a/src/generators/jsx-ast/generate.mjs b/src/generators/jsx-ast/generate.mjs index ea55af76..9d9ab5c6 100644 --- a/src/generators/jsx-ast/generate.mjs +++ b/src/generators/jsx-ast/generate.mjs @@ -61,7 +61,7 @@ export async function* generate(input, worker) { entries: groupedModules.get(head.api), })); - for await (const chunkResult of worker.stream(entries, entries, docPages)) { + for await (const chunkResult of worker.stream(entries, docPages)) { yield chunkResult; } } diff --git a/src/generators/legacy-html/generate.mjs b/src/generators/legacy-html/generate.mjs index fe39aec6..b60131ec 100644 --- a/src/generators/legacy-html/generate.mjs +++ b/src/generators/legacy-html/generate.mjs @@ -123,7 +123,7 @@ export async function* generate(input, worker) { })); // Stream chunks as they complete - HTML files are written immediately - for await (const chunkResult of worker.stream(entries, entries, navigation)) { + for await (const chunkResult of worker.stream(entries, navigation)) { // Write files for this chunk in the generate method (main thread) if (config.output) { for (const template of chunkResult) { diff --git a/src/generators/legacy-json/generate.mjs b/src/generators/legacy-json/generate.mjs index 4e5972d6..d5f5fb82 100644 --- a/src/generators/legacy-json/generate.mjs +++ b/src/generators/legacy-json/generate.mjs @@ -49,7 +49,7 @@ export async function* generate(input, worker) { nodes: groupedModules.get(head.api), })); - for await (const chunkResult of worker.stream(entries, entries)) { + for await (const chunkResult of worker.stream(entries)) { if (config.output) { for (const section of chunkResult) { const out = join(config.output, `${section.api}.json`); diff --git a/src/generators/metadata/generate.mjs b/src/generators/metadata/generate.mjs index d3548141..149aacb6 100644 --- a/src/generators/metadata/generate.mjs +++ b/src/generators/metadata/generate.mjs @@ -32,7 +32,7 @@ export async function* generate(inputs, worker) { // Stream chunks as they complete - allows dependent generators // to start collecting/preparing while we're still processing - for await (const chunkResult of worker.stream(inputs, inputs, typeMap)) { + for await (const chunkResult of worker.stream(inputs, typeMap)) { yield chunkResult.flat(); } } diff --git a/src/generators/types.d.ts b/src/generators/types.d.ts index f2fc03d7..6374d800 100644 --- a/src/generators/types.d.ts +++ b/src/generators/types.d.ts @@ -18,13 +18,11 @@ declare global { * while upstream chunks are still being processed. * * @param items - Items to process (determines chunk distribution) - * @param fullInput - Full input data for context rebuilding in workers * @param opts - Additional options to pass to workers * @yields Each chunk's results as they complete */ stream( items: T[], - fullInput: T[], opts?: Record ): AsyncGenerator; } diff --git a/src/threading/__tests__/parallel.test.mjs b/src/threading/__tests__/parallel.test.mjs index 41b04c6f..324b0960 100644 --- a/src/threading/__tests__/parallel.test.mjs +++ b/src/threading/__tests__/parallel.test.mjs @@ -90,7 +90,7 @@ describe('createParallelWorker', () => { ]; const chunks = await collectChunks( - worker.stream(mockInput, mockInput, { typeMap: {} }) + worker.stream(mockInput, { typeMap: {} }) ); strictEqual(chunks.length, 4); @@ -121,7 +121,7 @@ describe('createParallelWorker', () => { ]; const chunks = await collectChunks( - worker.stream(mockInput, mockInput, { typeMap: {} }) + worker.stream(mockInput, { typeMap: {} }) ); strictEqual(chunks.length, 2); @@ -144,7 +144,7 @@ describe('createParallelWorker', () => { ]; const chunks = await collectChunks( - worker.stream(mockInput, mockInput, { typeMap: {} }) + worker.stream(mockInput, { typeMap: {} }) ); strictEqual(chunks.length, 1); @@ -172,7 +172,7 @@ describe('createParallelWorker', () => { ]; const chunks = await collectChunks( - worker.stream(mockInput, mockInput, { typeMap: {} }) + worker.stream(mockInput, { typeMap: {} }) ); strictEqual(chunks.length, 2); diff --git a/src/threading/parallel.mjs b/src/threading/parallel.mjs index 6627f0f0..d74ba92e 100644 --- a/src/threading/parallel.mjs +++ b/src/threading/parallel.mjs @@ -76,13 +76,12 @@ export default function createParallelWorker( /** * Processes items in parallel, yielding results as chunks complete. * - * @template T, R + * @template T * @param {T[]} items - Items to process - * @param {T[]} fullInput - Full input for context * @param {object} extra - Extra options * @yields {R[]} Chunk results as they complete */ - async *stream(items, fullInput, extra) { + async *stream(items, extra) { if (items.length === 0) { return; } @@ -101,7 +100,7 @@ export default function createParallelWorker( chunks.map(indices => { if (runInOneGo) { const promise = generator - .processChunk(fullInput, indices, extra) + .processChunk(items, indices, extra) .then(result => ({ promise, result })); return promise; @@ -109,13 +108,7 @@ export default function createParallelWorker( const promise = pool .run( - createTask( - fullInput, - indices, - extra, - configuration, - generatorName - ) + createTask(items, indices, extra, configuration, generatorName) ) .then(result => ({ promise, result }));