Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/generators.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ export async function* generate(input, worker) {
};

// Stream chunks as they complete
for await (const chunkResult of worker.stream(input, input, deps)) {
for await (const chunkResult of worker.stream(input, deps)) {
// Process chunk result if needed
yield chunkResult;
}
Expand Down Expand Up @@ -316,7 +316,7 @@ export async function processChunk(fullInput, itemIndices, deps) {
*/
export async function* generate(input, worker) {
// Stream results as workers complete chunks
for await (const chunkResult of worker.stream(input, input, {})) {
for await (const chunkResult of worker.stream(input, {})) {
// Yield immediately - downstream can start processing
yield chunkResult;
}
Expand Down
2 changes: 1 addition & 1 deletion src/generators/ast-js/generate.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export async function* generate(_, worker) {

// Parse the Javascript sources into ASTs in parallel using worker threads
// source is both the items list and the fullInput since we use sliceInput
for await (const chunkResult of worker.stream(files, files)) {
for await (const chunkResult of worker.stream(files)) {
yield chunkResult;
}
}
2 changes: 1 addition & 1 deletion src/generators/ast/generate.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export async function* generate(_, worker) {
);

// Parse markdown files in parallel using worker threads
for await (const chunkResult of worker.stream(files, files)) {
for await (const chunkResult of worker.stream(files)) {
yield chunkResult;
}
}
2 changes: 1 addition & 1 deletion src/generators/jsx-ast/generate.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export async function* generate(input, worker) {
entries: groupedModules.get(head.api),
}));

for await (const chunkResult of worker.stream(entries, entries, docPages)) {
for await (const chunkResult of worker.stream(entries, docPages)) {
yield chunkResult;
}
}
2 changes: 1 addition & 1 deletion src/generators/legacy-html/generate.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ export async function* generate(input, worker) {
}));

// Stream chunks as they complete - HTML files are written immediately
for await (const chunkResult of worker.stream(entries, entries, navigation)) {
for await (const chunkResult of worker.stream(entries, navigation)) {
// Write files for this chunk in the generate method (main thread)
if (config.output) {
for (const template of chunkResult) {
Expand Down
2 changes: 1 addition & 1 deletion src/generators/legacy-json/generate.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export async function* generate(input, worker) {
nodes: groupedModules.get(head.api),
}));

for await (const chunkResult of worker.stream(entries, entries)) {
for await (const chunkResult of worker.stream(entries)) {
if (config.output) {
for (const section of chunkResult) {
const out = join(config.output, `${section.api}.json`);
Expand Down
2 changes: 1 addition & 1 deletion src/generators/metadata/generate.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export async function* generate(inputs, worker) {

// Stream chunks as they complete - allows dependent generators
// to start collecting/preparing while we're still processing
for await (const chunkResult of worker.stream(inputs, inputs, typeMap)) {
for await (const chunkResult of worker.stream(inputs, typeMap)) {
yield chunkResult.flat();
}
}
2 changes: 0 additions & 2 deletions src/generators/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ declare global {
* while upstream chunks are still being processed.
*
* @param items - Items to process (determines chunk distribution)
* @param fullInput - Full input data for context rebuilding in workers
* @param opts - Additional options to pass to workers
* @yields Each chunk's results as they complete
*/
stream<T, R>(
items: T[],
fullInput: T[],
opts?: Record<string, unknown>
): AsyncGenerator<R[], void, unknown>;
}
Expand Down
8 changes: 4 additions & 4 deletions src/threading/__tests__/parallel.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ describe('createParallelWorker', () => {
];

const chunks = await collectChunks(
worker.stream(mockInput, mockInput, { typeMap: {} })
worker.stream(mockInput, { typeMap: {} })
);

strictEqual(chunks.length, 4);
Expand Down Expand Up @@ -121,7 +121,7 @@ describe('createParallelWorker', () => {
];

const chunks = await collectChunks(
worker.stream(mockInput, mockInput, { typeMap: {} })
worker.stream(mockInput, { typeMap: {} })
);

strictEqual(chunks.length, 2);
Expand All @@ -144,7 +144,7 @@ describe('createParallelWorker', () => {
];

const chunks = await collectChunks(
worker.stream(mockInput, mockInput, { typeMap: {} })
worker.stream(mockInput, { typeMap: {} })
);

strictEqual(chunks.length, 1);
Expand Down Expand Up @@ -172,7 +172,7 @@ describe('createParallelWorker', () => {
];

const chunks = await collectChunks(
worker.stream(mockInput, mockInput, { typeMap: {} })
worker.stream(mockInput, { typeMap: {} })
);

strictEqual(chunks.length, 2);
Expand Down
15 changes: 4 additions & 11 deletions src/threading/parallel.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ export default function createParallelWorker(
/**
* Processes items in parallel, yielding results as chunks complete.
*
* @template T, R
* @template T
* @param {T[]} items - Items to process
* @param {T[]} fullInput - Full input for context
* @param {object} extra - Extra options
* @yields {R[]} Chunk results as they complete
*/
async *stream(items, fullInput, extra) {
async *stream(items, extra) {
if (items.length === 0) {
return;
}
Expand All @@ -101,21 +100,15 @@ export default function createParallelWorker(
chunks.map(indices => {
if (runInOneGo) {
const promise = generator
.processChunk(fullInput, indices, extra)
.processChunk(items, indices, extra)
.then(result => ({ promise, result }));

return promise;
}

const promise = pool
.run(
createTask(
fullInput,
indices,
extra,
configuration,
generatorName
)
createTask(items, indices, extra, configuration, generatorName)
)
.then(result => ({ promise, result }));

Expand Down
Loading