Skip to content

⚡ Bolt: Stream indexing results with concurrent task pool#405

Open
AhmmedSamier wants to merge 1 commit intomasterfrom
bolt/indexer-worker-concurrency-985315326601958411
Open

⚡ Bolt: Stream indexing results with concurrent task pool#405
AhmmedSamier wants to merge 1 commit intomasterfrom
bolt/indexer-worker-concurrency-985315326601958411

Conversation

@AhmmedSamier
Copy link
Copy Markdown
Owner

@AhmmedSamier AhmmedSamier commented May 1, 2026

💡 What:
Replaced the for loop and chunk-based Promise.all logic in indexer-worker.ts with a continuous pLimit concurrency task pool. It now tracks totalCompleted across all files and maintains a bounded batch state array, flushing it whenever the batch limit is reached or when all files are completed.

🎯 Why:
The chunk-based Promise.all approach suffers from head-of-line blocking: if a single file in a 25-file chunk is extremely large and slow to parse, the other 24 threads/results sit idle until that specific chunk resolves. This delays the streaming of early parsed results back to the parent thread. Using a pLimit pool keeps maximum concurrent tasks running constantly and streams batches back to the parent without chunk-based bottlenecks.

📊 Impact:
Significantly reduces initial indexing latency and provides smoother UI progress indicators by streamlining result yields, maximizing concurrent throughput instead of bottlenecking on the slowest file per chunk.

🔬 Measurement:
Run the workspace indexer test suite (cd language-server && bun test) and verify the indexing tests complete correctly without hanging. In actual usage, tracking the rate at which result events fire back to the parent thread compared to the previous chunked execution demonstrates faster time-to-first-yield.


PR created automatically by Jules for task 985315326601958411 started by @AhmmedSamier

Summary by CodeRabbit

  • Refactor
    • Optimized indexing performance through improved concurrency handling in file processing.
    • Enhanced progress reporting during large file indexing operations.

…e blocking

Co-authored-by: AhmmedSamier <17784876+AhmmedSamier@users.noreply.github.com>
@google-labs-jules
Copy link
Copy Markdown
Contributor

👋 Jules, reporting for duty! I'm here to lend a hand with this pull request.

When you start a review, I'll add a 👀 emoji to each comment to let you know I've read it. I'll focus on feedback directed at me and will do my best to stay out of conversations between you and other bots or reviewers to keep the noise down.

I'll push a commit with your requested changes shortly after. Please note there might be a delay between these steps, but rest assured I'm on the job!

For more direct control, you can switch me to Reactive Mode. When this mode is on, I will only act on comments where you specifically mention me with @jules. You can find this option in the Pull Request section of your global Jules UI settings. You can always switch back!

New to Jules? Learn more at jules.google/docs.


For security, I will only act on instructions from the user who triggered this task.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 1, 2026

📝 Walkthrough

Walkthrough

The indexing worker refactored its file processing model from sequential batch-chunked iteration to a concurrency-limited task pool using pLimit(CONCURRENCY). Batch accumulation is now shared across all files with partial result messages posted when batch size thresholds are reached or all files complete, replacing per-chunk posting logic.

Changes

Cohort / File(s) Summary
Concurrency and Batching Optimization
language-server/src/core/indexer-worker.ts
Replaced sequential BATCH_SIZE chunk iteration with Promise.all per chunk with a single concurrency-limited task pool via pLimit(CONCURRENCY). Introduced shared batch accumulation with manual item collection and partial result posting driven by batch size thresholds or completion status rather than chunk boundaries.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐰 From chunks marching in a line,
To pools of tasks that intertwine!
Batches bloom when full they grow,
Faster indexing—a speedy show! ✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly references the main change: switching to a concurrent task pool for streaming indexing results, which aligns with the core objective of replacing chunked Promise.all with pLimit-based concurrency.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch bolt/indexer-worker-concurrency-985315326601958411

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 60 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@language-server/src/core/indexer-worker.ts`:
- Around line 40-68: The current eager filePaths.map(limit(...)) approach should
be replaced with a pull-based runner pool to bound memory and fix the any[]
type: change batchItems from any[] to the parser item type (e.g., SymbolInfo[]
or the exact type returned by parser.parseFile) and remove the map/limit
pattern; instead spawn N = Math.min(CONCURRENCY, filePaths.length) async runners
that loop while (true) { const idx = nextIndex++; if (idx >= filePaths.length)
break; try { const items = await parser.parseFile(filePaths[idx]); for (let i =
0; i < items.length; i++) batchItems.push(items[i]); } catch { /* ignore */ }
finally { totalCompleted++; batchProcessedFiles++; if (batchProcessedFiles >=
BATCH_SIZE || totalCompleted === filePaths.length) { parentPort?.postMessage({
type: 'result', items: batchItems, count: batchProcessedFiles, isPartial:
totalCompleted < filePaths.length, }); batchItems = []; batchProcessedFiles = 0;
} } } and await Promise.all(runners) to finish; keep references to
parser.parseFile, batchItems, totalCompleted, batchProcessedFiles, CONCURRENCY
and BATCH_SIZE to locate and update the code.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 6da07f1a-7313-4072-bb7d-f2920bef085c

📥 Commits

Reviewing files that changed from the base of the PR and between 32a2408 and 15e9693.

📒 Files selected for processing (1)
  • language-server/src/core/indexer-worker.ts

Comment on lines +40 to +68
await Promise.all(
filePaths.map((filePath) =>
limit(async () => {
try {
return await parser.parseFile(filePath);
const items = await parser.parseFile(filePath);
// ⚡ Bolt: Fast manual loop pushing to avoid Maximum Call Stack Size Exceeded
for (let i = 0; i < items.length; i++) {
batchItems.push(items[i]);
}
} catch {
return [];
// Ignore
} finally {
totalCompleted++;
batchProcessedFiles++;

if (batchProcessedFiles >= BATCH_SIZE || totalCompleted === filePaths.length) {
parentPort?.postMessage({
type: 'result',
items: batchItems,
count: batchProcessedFiles,
isPartial: totalCompleted < filePaths.length,
});
batchItems = [];
batchProcessedFiles = 0;
}
}
}),
);

const chunkItems = results.flat();

// Send back chunk result immediately to keep main thread unblocked but processing
parentPort?.postMessage({
type: 'result',
items: chunkItems,
count: chunk.length,
isPartial: i + BATCH_SIZE < filePaths.length,
});
}
),
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify how many files can be sent to one worker task at a time.
rg -n -C4 '\bassignWorkerTask\s*\(|\bgetWorkerChunkSize\s*\(|filePaths:\s*batchFiles' language-server/src/core/workspace-indexer.ts

Repository: AhmmedSamier/DeepLens

Length of output: 1818


🏁 Script executed:

# Check the actual type declarations in indexer-worker.ts, especially around batchItems
sed -n '1,70p' language-server/src/core/indexer-worker.ts

Repository: AhmmedSamier/DeepLens

Length of output: 2624


🏁 Script executed:

# Get the complete getWorkerChunkSize implementation
sed -n '752,770p' language-server/src/core/workspace-indexer.ts

Repository: AhmmedSamier/DeepLens

Length of output: 695


🏁 Script executed:

# Check CONCURRENCY constant definition
rg -n 'CONCURRENCY|const.*Concurrency' language-server/src/core/indexer-worker.ts

Repository: AhmmedSamier/DeepLens

Length of output: 146


Replace the eager map/limit pattern with a pull-based runner pool to bound memory allocation.

The code violates two guidelines:

  1. Type safety: Line 39 uses any[] for batchItems. This removes type safety from the worker-to-parent message contract and contradicts the guideline "Ensure no new any types are introduced during development". Use an explicit type (e.g., SymbolInfo[] or appropriate item type from the parser).

  2. Memory efficiency: filePaths.map(limit(...)) creates a closure and promise for every file upfront, even though only CONCURRENCY (15) tasks run concurrently. When a worker receives 200+ files (e.g., getWorkerChunkSize returns 60 for batches ≥200), this allocates ~200 closures that remain queued in memory. A pull-based model that spawns only 15 runners pulling files one-by-one keeps queue size O(CONCURRENCY) and reduces GC pressure in this hot path.

The suggested implementation spawns exactly Math.min(CONCURRENCY, filePaths.length) runners, each polling the next file index until exhausted, rather than eagerly mapping all files.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@language-server/src/core/indexer-worker.ts` around lines 40 - 68, The current
eager filePaths.map(limit(...)) approach should be replaced with a pull-based
runner pool to bound memory and fix the any[] type: change batchItems from any[]
to the parser item type (e.g., SymbolInfo[] or the exact type returned by
parser.parseFile) and remove the map/limit pattern; instead spawn N =
Math.min(CONCURRENCY, filePaths.length) async runners that loop while (true) {
const idx = nextIndex++; if (idx >= filePaths.length) break; try { const items =
await parser.parseFile(filePaths[idx]); for (let i = 0; i < items.length; i++)
batchItems.push(items[i]); } catch { /* ignore */ } finally { totalCompleted++;
batchProcessedFiles++; if (batchProcessedFiles >= BATCH_SIZE || totalCompleted
=== filePaths.length) { parentPort?.postMessage({ type: 'result', items:
batchItems, count: batchProcessedFiles, isPartial: totalCompleted <
filePaths.length, }); batchItems = []; batchProcessedFiles = 0; } } } and await
Promise.all(runners) to finish; keep references to parser.parseFile, batchItems,
totalCompleted, batchProcessedFiles, CONCURRENCY and BATCH_SIZE to locate and
update the code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant